You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/21 16:27:39 UTC
[43/52] [partial] ignite git commit: IGNITE-1513: Moved .Net.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
new file mode 100644
index 0000000..382ab1e
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
+ using System.Threading;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Impl.Compute;
+ using Apache.Ignite.Core.Impl.Events;
+ using Apache.Ignite.Core.Impl.Messaging;
+ using Apache.Ignite.Core.Impl.Portable;
+ using Apache.Ignite.Core.Impl.Portable.Metadata;
+ using Apache.Ignite.Core.Impl.Services;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Messaging;
+ using Apache.Ignite.Core.Portable;
+ using Apache.Ignite.Core.Services;
+ using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils;
+
+ /// <summary>
+ /// Ignite projection implementation.
+ /// </summary>
+ internal class ClusterGroupImpl : PlatformTarget, IClusterGroupEx
+ {
+ /** Attribute: platform. */
+ private const string AttrPlatform = "org.apache.ignite.platform";
+
+ /** Platform. */
+ private const string Platform = "dotnet";
+
+ /** Initial topver; invalid from Java perspective, so update will be triggered when this value is met. */
+ private const int TopVerInit = 0;
+
+ /** */
+ private const int OpAllMetadata = 1;
+
+ /** */
+ private const int OpForAttribute = 2;
+
+ /** */
+ private const int OpForCache = 3;
+
+ /** */
+ private const int OpForClient = 4;
+
+ /** */
+ private const int OpForData = 5;
+
+ /** */
+ private const int OpForHost = 6;
+
+ /** */
+ private const int OpForNodeIds = 7;
+
+ /** */
+ private const int OpMetadata = 8;
+
+ /** */
+ private const int OpMetrics = 9;
+
+ /** */
+ private const int OpMetricsFiltered = 10;
+
+ /** */
+ private const int OpNodeMetrics = 11;
+
+ /** */
+ private const int OpNodes = 12;
+
+ /** */
+ private const int OpPingNode = 13;
+
+ /** */
+ private const int OpTopology = 14;
+
+ /** Initial Ignite instance. */
+ private readonly Ignite _ignite;
+
+ /** Predicate. */
+ private readonly Func<IClusterNode, bool> _pred;
+
+ /** Topology version. */
+ [SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")]
+ private long _topVer = TopVerInit;
+
+ /** Nodes for the given topology version. */
+ private volatile IList<IClusterNode> _nodes;
+
+ /** Processor. */
+ private readonly IUnmanagedTarget _proc;
+
+ /** Compute. */
+ private readonly Lazy<Compute> _comp;
+
+ /** Messaging. */
+ private readonly Lazy<Messaging> _msg;
+
+ /** Events. */
+ private readonly Lazy<Events> _events;
+
+ /** Services. */
+ private readonly Lazy<IServices> _services;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="proc">Processor.</param>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ /// <param name="ignite">Grid.</param>
+ /// <param name="pred">Predicate.</param>
+ public ClusterGroupImpl(IUnmanagedTarget proc, IUnmanagedTarget target, PortableMarshaller marsh,
+ Ignite ignite, Func<IClusterNode, bool> pred)
+ : base(target, marsh)
+ {
+ _proc = proc;
+ _ignite = ignite;
+ _pred = pred;
+
+ _comp = new Lazy<Compute>(() =>
+ new Compute(new ComputeImpl(UU.ProcessorCompute(proc, target), marsh, this, false)));
+
+ _msg = new Lazy<Messaging>(() => new Messaging(UU.ProcessorMessage(proc, target), marsh, this));
+
+ _events = new Lazy<Events>(() => new Events(UU.ProcessorEvents(proc, target), marsh, this));
+
+ _services = new Lazy<IServices>(() =>
+ new Services(UU.ProcessorServices(proc, target), marsh, this, false, false));
+ }
+
+ /** <inheritDoc /> */
+ public IIgnite Ignite
+ {
+ get { return _ignite; }
+ }
+
+ /** <inheritDoc /> */
+ public ICompute GetCompute()
+ {
+ return _comp.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodes(IEnumerable<IClusterNode> nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes, "nodes");
+
+ return ForNodeIds0(nodes, node => node.Id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodes(params IClusterNode[] nodes)
+ {
+ IgniteArgumentCheck.NotNull(nodes, "nodes");
+
+ return ForNodeIds0(nodes, node => node.Id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodeIds(IEnumerable<Guid> ids)
+ {
+ IgniteArgumentCheck.NotNull(ids, "ids");
+
+ return ForNodeIds0(ids, null);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForNodeIds(params Guid[] ids)
+ {
+ IgniteArgumentCheck.NotNull(ids, "ids");
+
+ return ForNodeIds0(ids, null);
+ }
+
+ /// <summary>
+ /// Internal routine to get projection for specific node IDs.
+ /// </summary>
+ /// <param name="items">Items.</param>
+ /// <param name="func">Function to transform item to Guid (optional).</param>
+ /// <returns></returns>
+ private IClusterGroup ForNodeIds0<T>(IEnumerable<T> items, Func<T, Guid> func)
+ {
+ Debug.Assert(items != null);
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForNodeIds, writer =>
+ {
+ WriteEnumerable(writer, items, func);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForPredicate(Func<IClusterNode, bool> p)
+ {
+ var newPred = _pred == null ? p : node => _pred(node) && p(node);
+
+ return new ClusterGroupImpl(_proc, Target, Marshaller, _ignite, newPred);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForAttribute(string name, string val)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForAttribute, writer =>
+ {
+ writer.WriteString(name);
+ writer.WriteString(val);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /// <summary>
+ /// Creates projection with a specified op.
+ /// </summary>
+ /// <param name="name">Cache name to include into projection.</param>
+ /// <param name="op">Operation id.</param>
+ /// <returns>
+ /// Projection over nodes that have specified cache running.
+ /// </returns>
+ private IClusterGroup ForCacheNodes(string name, int op)
+ {
+ IUnmanagedTarget prj = DoProjetionOutOp(op, writer =>
+ {
+ writer.WriteString(name);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForCacheNodes(string name)
+ {
+ return ForCacheNodes(name, OpForCache);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForDataNodes(string name)
+ {
+ return ForCacheNodes(name, OpForData);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForClientNodes(string name)
+ {
+ return ForCacheNodes(name, OpForClient);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForRemotes()
+ {
+ return GetClusterGroup(UU.ProjectionForRemotes(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForHost(IClusterNode node)
+ {
+ IgniteArgumentCheck.NotNull(node, "node");
+
+ IUnmanagedTarget prj = DoProjetionOutOp(OpForHost, writer =>
+ {
+ writer.WriteGuid(node.Id);
+ });
+
+ return GetClusterGroup(prj);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForRandom()
+ {
+ return GetClusterGroup(UU.ProjectionForRandom(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForOldest()
+ {
+ return GetClusterGroup(UU.ProjectionForOldest(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForYoungest()
+ {
+ return GetClusterGroup(UU.ProjectionForYoungest(Target));
+ }
+
+ /** <inheritDoc /> */
+ public IClusterGroup ForDotNet()
+ {
+ return ForAttribute(AttrPlatform, Platform);
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<IClusterNode> GetNodes()
+ {
+ return RefreshNodes();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterNode GetNode(Guid id)
+ {
+ return GetNodes().FirstOrDefault(node => node.Id == id);
+ }
+
+ /** <inheritDoc /> */
+ public IClusterNode GetNode()
+ {
+ return GetNodes().FirstOrDefault();
+ }
+
+ /** <inheritDoc /> */
+ public IClusterMetrics GetMetrics()
+ {
+ if (_pred == null)
+ {
+ return DoInOp(OpMetrics, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
+ }
+ return DoOutInOp(OpMetricsFiltered, writer =>
+ {
+ WriteEnumerable(writer, GetNodes().Select(node => node.Id));
+ }, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ });
+ }
+
+ /** <inheritDoc /> */
+ public IMessaging GetMessaging()
+ {
+ return _msg.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IEvents GetEvents()
+ {
+ return _events.Value;
+ }
+
+ /** <inheritDoc /> */
+ public IServices GetServices()
+ {
+ return _services.Value;
+ }
+
+ /// <summary>
+ /// Pings a remote node.
+ /// </summary>
+ /// <param name="nodeId">ID of a node to ping.</param>
+ /// <returns>True if node for a given ID is alive, false otherwise.</returns>
+ internal bool PingNode(Guid nodeId)
+ {
+ return DoOutOp(OpPingNode, nodeId) == True;
+ }
+
+ /// <summary>
+ /// Predicate (if any).
+ /// </summary>
+ public Func<IClusterNode, bool> Predicate
+ {
+ get { return _pred; }
+ }
+
+ /// <summary>
+ /// Refresh cluster node metrics.
+ /// </summary>
+ /// <param name="nodeId">Node</param>
+ /// <param name="lastUpdateTime"></param>
+ /// <returns></returns>
+ internal ClusterMetricsImpl RefreshClusterNodeMetrics(Guid nodeId, long lastUpdateTime)
+ {
+ return DoOutInOp(OpNodeMetrics, writer =>
+ {
+ writer.WriteGuid(nodeId);
+ writer.WriteLong(lastUpdateTime);
+ }, stream =>
+ {
+ IPortableRawReader reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ }
+ );
+ }
+
+ /// <summary>
+ /// Gets a topology by version. Returns null if topology history storage doesn't contain
+ /// specified topology version (history currently keeps the last 1000 snapshots).
+ /// </summary>
+ /// <param name="version">Topology version.</param>
+ /// <returns>Collection of Ignite nodes which represented by specified topology version,
+ /// if it is present in history storage, {@code null} otherwise.</returns>
+ /// <exception cref="IgniteException">If underlying SPI implementation does not support
+ /// topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+ /// supports topology history.</exception>
+ internal ICollection<IClusterNode> Topology(long version)
+ {
+ return DoOutInOp(OpTopology, writer => writer.WriteLong(version),
+ input => IgniteUtils.ReadNodes(Marshaller.StartUnmarshal(input)));
+ }
+
+ /// <summary>
+ /// Topology version.
+ /// </summary>
+ internal long TopologyVersion
+ {
+ get
+ {
+ RefreshNodes();
+
+ return Interlocked.Read(ref _topVer);
+ }
+ }
+
+ /// <summary>
+ /// Update topology.
+ /// </summary>
+ /// <param name="newTopVer">New topology version.</param>
+ /// <param name="newNodes">New nodes.</param>
+ internal void UpdateTopology(long newTopVer, List<IClusterNode> newNodes)
+ {
+ lock (this)
+ {
+ // If another thread already advanced topology version further, we still
+ // can safely return currently received nodes, but we will not assign them.
+ if (_topVer < newTopVer)
+ {
+ Interlocked.Exchange(ref _topVer, newTopVer);
+
+ _nodes = newNodes.AsReadOnly();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Get current nodes without refreshing the topology.
+ /// </summary>
+ /// <returns>Current nodes.</returns>
+ internal IList<IClusterNode> NodesNoRefresh()
+ {
+ return _nodes;
+ }
+
+ /// <summary>
+ /// Creates new Cluster Group from given native projection.
+ /// </summary>
+ /// <param name="prj">Native projection.</param>
+ /// <returns>New cluster group.</returns>
+ private IClusterGroup GetClusterGroup(IUnmanagedTarget prj)
+ {
+ return new ClusterGroupImpl(_proc, prj, Marshaller, _ignite, _pred);
+ }
+
+ /// <summary>
+ /// Refresh projection nodes.
+ /// </summary>
+ /// <returns>Nodes.</returns>
+ private IList<IClusterNode> RefreshNodes()
+ {
+ long oldTopVer = Interlocked.Read(ref _topVer);
+
+ List<IClusterNode> newNodes = null;
+
+ DoOutInOp(OpNodes, writer =>
+ {
+ writer.WriteLong(oldTopVer);
+ }, input =>
+ {
+ PortableReaderImpl reader = Marshaller.StartUnmarshal(input);
+
+ if (reader.ReadBoolean())
+ {
+ // Topology has been updated.
+ long newTopVer = reader.ReadLong();
+
+ newNodes = IgniteUtils.ReadNodes(reader, _pred);
+
+ UpdateTopology(newTopVer, newNodes);
+ }
+ });
+
+ if (newNodes != null)
+ return newNodes;
+
+ // No topology changes.
+ Debug.Assert(_nodes != null, "At least one topology update should have occurred.");
+
+ return _nodes;
+ }
+
+ /// <summary>
+ /// Perform synchronous out operation returning value.
+ /// </summary>
+ /// <param name="type">Operation type.</param>
+ /// <param name="action">Action.</param>
+ /// <returns>Native projection.</returns>
+ private IUnmanagedTarget DoProjetionOutOp(int type, Action<PortableWriterImpl> action)
+ {
+ using (var stream = IgniteManager.Memory.Allocate().Stream())
+ {
+ var writer = Marshaller.StartMarshal(stream);
+
+ action(writer);
+
+ FinishMarshal(writer);
+
+ return UU.ProjectionOutOpRet(Target, type, stream.SynchronizeOutput());
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IPortableMetadata Metadata(int typeId)
+ {
+ return DoOutInOp<IPortableMetadata>(OpMetadata,
+ writer =>
+ {
+ writer.WriteInt(typeId);
+ },
+ stream =>
+ {
+ PortableReaderImpl reader = Marshaller.StartUnmarshal(stream, false);
+
+ return reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null;
+ }
+ );
+ }
+
+ /// <summary>
+ /// Gets metadata for all known types.
+ /// </summary>
+ public List<IPortableMetadata> Metadata()
+ {
+ return DoInOp(OpAllMetadata, s =>
+ {
+ var reader = Marshaller.StartUnmarshal(s);
+
+ var size = reader.ReadInt();
+
+ var res = new List<IPortableMetadata>(size);
+
+ for (var i = 0; i < size; i++)
+ res.Add(reader.ReadBoolean() ? new PortableMetadataImpl(reader) : null);
+
+ return res;
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
new file mode 100644
index 0000000..664a1f1
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterMetricsImpl.cs
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Cluster metrics implementation.
+ /// </summary>
+ internal class ClusterMetricsImpl : IClusterMetrics
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ClusterMetricsImpl"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ClusterMetricsImpl(IPortableRawReader reader)
+ {
+ LastUpdateTimeRaw = reader.ReadLong();
+
+ DateTime? lastUpdateTime0 = reader.ReadDate();
+
+ LastUpdateTime = lastUpdateTime0 ?? default(DateTime);
+ MaximumActiveJobs = reader.ReadInt();
+ CurrentActiveJobs = reader.ReadInt();
+ AverageActiveJobs = reader.ReadFloat();
+ MaximumWaitingJobs = reader.ReadInt();
+
+ CurrentWaitingJobs = reader.ReadInt();
+ AverageWaitingJobs = reader.ReadFloat();
+ MaximumRejectedJobs = reader.ReadInt();
+ CurrentRejectedJobs = reader.ReadInt();
+ AverageRejectedJobs = reader.ReadFloat();
+
+ TotalRejectedJobs = reader.ReadInt();
+ MaximumCancelledJobs = reader.ReadInt();
+ CurrentCancelledJobs = reader.ReadInt();
+ AverageCancelledJobs = reader.ReadFloat();
+ TotalCancelledJobs = reader.ReadInt();
+
+ TotalExecutedJobs = reader.ReadInt();
+ MaximumJobWaitTime = reader.ReadLong();
+ CurrentJobWaitTime = reader.ReadLong();
+ AverageJobWaitTime = reader.ReadDouble();
+ MaximumJobExecuteTime = reader.ReadLong();
+
+ CurrentJobExecuteTime = reader.ReadLong();
+ AverageJobExecuteTime = reader.ReadDouble();
+ TotalExecutedTasks = reader.ReadInt();
+ TotalIdleTime = reader.ReadLong();
+ CurrentIdleTime = reader.ReadLong();
+
+ TotalCpus = reader.ReadInt();
+ CurrentCpuLoad = reader.ReadDouble();
+ AverageCpuLoad = reader.ReadDouble();
+ CurrentGcCpuLoad = reader.ReadDouble();
+ HeapMemoryInitialized = reader.ReadLong();
+
+ HeapMemoryUsed = reader.ReadLong();
+ HeapMemoryCommitted = reader.ReadLong();
+ HeapMemoryMaximum = reader.ReadLong();
+ HeapMemoryTotal = reader.ReadLong();
+ NonHeapMemoryInitialized = reader.ReadLong();
+
+ NonHeapMemoryUsed = reader.ReadLong();
+ NonHeapMemoryCommitted = reader.ReadLong();
+ NonHeapMemoryMaximum = reader.ReadLong();
+ NonHeapMemoryTotal = reader.ReadLong();
+ UpTime = reader.ReadLong();
+
+ DateTime? startTime0 = reader.ReadDate();
+
+ StartTime = startTime0 ?? default(DateTime);
+
+ DateTime? nodeStartTime0 = reader.ReadDate();
+
+ NodeStartTime = nodeStartTime0 ?? default(DateTime);
+
+ CurrentThreadCount = reader.ReadInt();
+ MaximumThreadCount = reader.ReadInt();
+ TotalStartedThreadCount = reader.ReadLong();
+ CurrentDaemonThreadCount = reader.ReadInt();
+ LastDataVersion = reader.ReadLong();
+
+ SentMessagesCount = reader.ReadInt();
+ SentBytesCount = reader.ReadLong();
+ ReceivedMessagesCount = reader.ReadInt();
+ ReceivedBytesCount = reader.ReadLong();
+ OutboundMessagesQueueSize = reader.ReadInt();
+
+ TotalNodes = reader.ReadInt();
+ }
+
+ /// <summary>
+ /// Last update time in raw format.
+ /// </summary>
+ internal long LastUpdateTimeRaw { get; set; }
+
+ /** <inheritDoc /> */
+ public DateTime LastUpdateTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageActiveJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageWaitingJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalRejectedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public float AverageCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalCancelledJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalExecutedJobs { get; private set; }
+
+ /** <inheritDoc /> */
+ public long MaximumJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageJobWaitTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long MaximumJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageJobExecuteTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalExecutedTasks { get; private set; }
+
+ /** <inheritDoc /> */
+ public long TotalBusyTime
+ {
+ get { return UpTime - TotalIdleTime; }
+ }
+
+ /** <inheritDoc /> */
+ public long TotalIdleTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public long CurrentIdleTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public float BusyTimePercentage
+ {
+ get { return 1 - IdleTimePercentage; }
+ }
+
+ /** <inheritDoc /> */
+ public float IdleTimePercentage
+ {
+ get { return TotalIdleTime / (float) UpTime; }
+ }
+
+ /** <inheritDoc /> */
+ public int TotalCpus { get; private set; }
+
+ /** <inheritDoc /> */
+ public double CurrentCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public double AverageCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public double CurrentGcCpuLoad { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryInitialized { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryUsed { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryCommitted { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryMaximum { get; private set; }
+
+ /** <inheritDoc /> */
+ public long HeapMemoryTotal { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryInitialized { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryUsed { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryCommitted { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryMaximum { get; private set; }
+
+ /** <inheritDoc /> */
+ public long NonHeapMemoryTotal { get; private set; }
+
+ /** <inheritDoc /> */
+ public long UpTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public DateTime StartTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public DateTime NodeStartTime { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int MaximumThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long TotalStartedThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int CurrentDaemonThreadCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long LastDataVersion { get; private set; }
+
+ /** <inheritDoc /> */
+ public int SentMessagesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long SentBytesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int ReceivedMessagesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public long ReceivedBytesCount { get; private set; }
+
+ /** <inheritDoc /> */
+ public int OutboundMessagesQueueSize { get; private set; }
+
+ /** <inheritDoc /> */
+ public int TotalNodes { get; private set; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
new file mode 100644
index 0000000..da49feb
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterNodeImpl.cs
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Impl.Collections;
+ using Apache.Ignite.Core.Impl.Common;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ /// Cluster node implementation.
+ /// </summary>
+ internal class ClusterNodeImpl : IClusterNode
+ {
+ /** Node ID. */
+ private readonly Guid _id;
+
+ /** Attributes. */
+ private readonly IDictionary<string, object> _attrs;
+
+ /** Addresses. */
+ private readonly ICollection<string> _addrs;
+
+ /** Hosts. */
+ private readonly ICollection<string> _hosts;
+
+ /** Order. */
+ private readonly long _order;
+
+ /** Local flag. */
+ private readonly bool _local;
+
+ /** Daemon flag. */
+ private readonly bool _daemon;
+
+ /** Metrics. */
+ private volatile ClusterMetricsImpl _metrics;
+
+ /** Ignite reference. */
+ private WeakReference _igniteRef;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ClusterNodeImpl"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ClusterNodeImpl(IPortableRawReader reader)
+ {
+ _id = reader.ReadGuid() ?? default(Guid);
+
+ _attrs = reader.ReadGenericDictionary<string, object>().AsReadOnly();
+ _addrs = reader.ReadGenericCollection<string>().AsReadOnly();
+ _hosts = reader.ReadGenericCollection<string>().AsReadOnly();
+ _order = reader.ReadLong();
+ _local = reader.ReadBoolean();
+ _daemon = reader.ReadBoolean();
+
+ _metrics = reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null;
+ }
+
+ /** <inheritDoc /> */
+ public Guid Id
+ {
+ get { return _id; }
+ }
+
+ /** <inheritDoc /> */
+ public T GetAttribute<T>(string name)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ return (T)_attrs[name];
+ }
+
+ /** <inheritDoc /> */
+ public bool TryGetAttribute<T>(string name, out T attr)
+ {
+ IgniteArgumentCheck.NotNull(name, "name");
+
+ object val;
+
+ if (_attrs.TryGetValue(name, out val))
+ {
+ attr = (T)val;
+
+ return true;
+ }
+ attr = default(T);
+
+ return false;
+ }
+
+ /** <inheritDoc /> */
+ public IDictionary<string, object> GetAttributes()
+ {
+ return _attrs;
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<string> Addresses
+ {
+ get
+ {
+ return _addrs;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public ICollection<string> HostNames
+ {
+ get
+ {
+ return _hosts;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public long Order
+ {
+ get
+ {
+ return _order;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool IsLocal
+ {
+ get
+ {
+ return _local;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public bool IsDaemon
+ {
+ get
+ {
+ return _daemon;
+ }
+ }
+
+ /** <inheritDoc /> */
+ public IClusterMetrics GetMetrics()
+ {
+ var ignite = (Ignite)_igniteRef.Target;
+
+ if (ignite == null)
+ return _metrics;
+
+ ClusterMetricsImpl oldMetrics = _metrics;
+
+ long lastUpdateTime = oldMetrics.LastUpdateTimeRaw;
+
+ ClusterMetricsImpl newMetrics = ignite.ClusterGroup.RefreshClusterNodeMetrics(_id, lastUpdateTime);
+
+ if (newMetrics != null)
+ {
+ lock (this)
+ {
+ if (_metrics.LastUpdateTime < newMetrics.LastUpdateTime)
+ _metrics = newMetrics;
+ }
+
+ return newMetrics;
+ }
+
+ return oldMetrics;
+ }
+
+ /** <inheritDoc /> */
+ public override string ToString()
+ {
+ return "GridNode [id=" + Id + ']';
+ }
+
+ /** <inheritDoc /> */
+ public override bool Equals(object obj)
+ {
+ ClusterNodeImpl node = obj as ClusterNodeImpl;
+
+ if (node != null)
+ return _id.Equals(node._id);
+
+ return false;
+ }
+
+ /** <inheritDoc /> */
+ public override int GetHashCode()
+ {
+ // ReSharper disable once NonReadonlyMemberInGetHashCode
+ return _id.GetHashCode();
+ }
+
+ /// <summary>
+ /// Initializes this instance with a grid.
+ /// </summary>
+ /// <param name="grid">The grid.</param>
+ internal void Init(Ignite grid)
+ {
+ _igniteRef = new WeakReference(grid);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
new file mode 100644
index 0000000..554eb0a
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Cluster/IClusterGroupEx.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cluster
+{
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Portable;
+
+ /// <summary>
+ ///
+ /// </summary>
+ internal interface IClusterGroupEx : IClusterGroup
+ {
+ /// <summary>
+ /// Gets protable metadata for type.
+ /// </summary>
+ /// <param name="typeId">Type ID.</param>
+ /// <returns>Metadata.</returns>
+ IPortableMetadata Metadata(int typeId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs
new file mode 100644
index 0000000..57295cb
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/CollectionExtensions.cs
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Collections
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Collection extension methods.
+ /// </summary>
+ public static class CollectionExtensions
+ {
+ /// <summary>
+ /// Returns a read-only System.Collections.Generic.IDictionary{K, V} wrapper for the current collection.
+ /// </summary>
+ public static IDictionary<TKey, TValue> AsReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dict)
+ {
+ return new ReadOnlyDictionary<TKey, TValue>(dict);
+ }
+
+ /// <summary>
+ /// Returns a read-only System.Collections.Generic.ICollection{K, V} wrapper for the current collection.
+ /// </summary>
+ public static ICollection<T> AsReadOnly<T>(this ICollection<T> col)
+ {
+ var list = col as List<T>;
+
+ return list != null ? (ICollection<T>) list.AsReadOnly() : new ReadOnlyCollection<T>(col);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs
new file mode 100644
index 0000000..bd7e895
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/MultiValueDictionary.cs
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Collections
+{
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Multiple-values-per-key dictionary.
+ /// </summary>
+ public class MultiValueDictionary<TKey, TValue>
+ {
+ /** Inner dictionary */
+ private readonly Dictionary<TKey, object> _dict = new Dictionary<TKey, object>();
+
+ /// <summary>
+ /// Adds a value.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="val">The value.</param>
+ public void Add(TKey key, TValue val)
+ {
+ object val0;
+
+ if (_dict.TryGetValue(key, out val0))
+ {
+ var list = val0 as List<TValue>;
+
+ if (list != null)
+ list.Add(val);
+ else
+ _dict[key] = new List<TValue> {(TValue) val0, val};
+ }
+ else
+ _dict[key] = val;
+ }
+
+ /// <summary>
+ /// Tries the get a value. In case of multiple values for a key, returns the last one.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="val">The value.</param>
+ /// <returns>True if value has been found for specified key; otherwise false.</returns>
+ public bool TryGetValue(TKey key, out TValue val)
+ {
+ object val0;
+
+ if (!_dict.TryGetValue(key, out val0))
+ {
+ val = default(TValue);
+ return false;
+ }
+
+ var list = val0 as List<TValue>;
+
+ if (list != null)
+ val = list[list.Count - 1];
+ else
+ val = (TValue) val0;
+
+ return true;
+ }
+
+ /// <summary>
+ /// Removes the specified value for the specified key.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="val">The value.</param>
+ public void Remove(TKey key, TValue val)
+ {
+ object val0;
+
+ if (!_dict.TryGetValue(key, out val0))
+ return;
+
+ var list = val0 as List<TValue>;
+
+ if (list != null)
+ {
+ list.Remove(val);
+
+ if (list.Count == 0)
+ _dict.Remove(key);
+ }
+ else if (Equals(val0, val))
+ _dict.Remove(key);
+ }
+
+ /// <summary>
+ /// Removes the last value for the specified key and returns it.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="val">The value.</param>
+ /// <returns>True if value has been found for specified key; otherwise false.</returns>
+ public bool TryRemove(TKey key, out TValue val)
+ {
+ object val0;
+
+ if (!_dict.TryGetValue(key, out val0))
+ {
+ val = default(TValue);
+
+ return false;
+ }
+
+ var list = val0 as List<TValue>;
+
+ if (list != null)
+ {
+ var index = list.Count - 1;
+
+ val = list[index];
+
+ list.RemoveAt(index);
+
+ if (list.Count == 0)
+ _dict.Remove(key);
+
+ return true;
+ }
+
+ val = (TValue) val0;
+
+ _dict.Remove(key);
+
+ return true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyCollection.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyCollection.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyCollection.cs
new file mode 100644
index 0000000..23cae6b
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyCollection.cs
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Collections
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Read-only wrapper over ICollection{T}.
+ /// </summary>
+ internal struct ReadOnlyCollection<T> : ICollection<T>
+ {
+ /** Wrapped collection. */
+ private readonly ICollection<T> _col;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ReadOnlyCollection{T}"/> class.
+ /// </summary>
+ public ReadOnlyCollection(ICollection<T> col)
+ {
+ _col = col;
+ }
+
+ /** <inheritdoc /> */
+ public IEnumerator<T> GetEnumerator()
+ {
+ return _col.GetEnumerator();
+ }
+
+ /** <inheritdoc /> */
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return ((IEnumerable) _col).GetEnumerator();
+ }
+
+ /** <inheritdoc /> */
+ public void Add(T item)
+ {
+ throw GetReadOnlyException();
+ }
+
+ /** <inheritdoc /> */
+ public void Clear()
+ {
+ throw GetReadOnlyException();
+ }
+
+ /** <inheritdoc /> */
+ public bool Contains(T item)
+ {
+ return _col.Contains(item);
+ }
+
+ /** <inheritdoc /> */
+ public void CopyTo(T[] array, int arrayIndex)
+ {
+ _col.CopyTo(array, arrayIndex);
+ }
+
+ /** <inheritdoc /> */
+ public bool Remove(T item)
+ {
+ throw GetReadOnlyException();
+ }
+
+ /** <inheritdoc /> */
+ public int Count
+ {
+ get { return _col.Count; }
+ }
+
+ /** <inheritdoc /> */
+ public bool IsReadOnly
+ {
+ get { return true; }
+ }
+
+ /// <summary>
+ /// Gets the readonly exception.
+ /// </summary>
+ private static Exception GetReadOnlyException()
+ {
+ return new NotSupportedException("Collection is read-only.");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyDictionary.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyDictionary.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyDictionary.cs
new file mode 100644
index 0000000..60ec9d0
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Collections/ReadOnlyDictionary.cs
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Collections
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+
+ /// <summary>
+ /// Read-only wrapper over IDictionary{K, V}.
+ /// </summary>
+ internal struct ReadOnlyDictionary<TKey, TValue> : IDictionary<TKey, TValue>
+ {
+ /** Inner dict. */
+ private readonly IDictionary<TKey, TValue> _dict;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ReadOnlyDictionary{K, V}"/> class.
+ /// </summary>
+ /// <param name="dict">The dictionary to wrap.</param>
+ public ReadOnlyDictionary(IDictionary<TKey, TValue> dict)
+ {
+ Debug.Assert(dict != null);
+
+ _dict = dict;
+ }
+
+ /** <inheritdoc /> */
+ public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
+ {
+ return _dict.GetEnumerator();
+ }
+
+ /** <inheritdoc /> */
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return ((IEnumerable) _dict).GetEnumerator();
+ }
+
+ /** <inheritdoc /> */
+ public void Add(KeyValuePair<TKey, TValue> item)
+ {
+ throw GetReadonlyException();
+ }
+
+ /** <inheritdoc /> */
+ public void Clear()
+ {
+ throw GetReadonlyException();
+ }
+
+ /** <inheritdoc /> */
+ public bool Contains(KeyValuePair<TKey, TValue> item)
+ {
+ return _dict.Contains(item);
+ }
+
+ /** <inheritdoc /> */
+ public void CopyTo(KeyValuePair<TKey, TValue>[] array, int arrayIndex)
+ {
+ _dict.CopyTo(array, arrayIndex);
+ }
+
+ /** <inheritdoc /> */
+ public bool Remove(KeyValuePair<TKey, TValue> item)
+ {
+ throw GetReadonlyException();
+ }
+
+ /** <inheritdoc /> */
+ public int Count
+ {
+ get { return _dict.Count; }
+ }
+
+ /** <inheritdoc /> */
+ public bool IsReadOnly
+ {
+ get { return true; }
+ }
+
+ /** <inheritdoc /> */
+ public bool ContainsKey(TKey key)
+ {
+ return _dict.ContainsKey(key);
+ }
+
+ /** <inheritdoc /> */
+ public void Add(TKey key, TValue value)
+ {
+ throw GetReadonlyException();
+ }
+
+ /** <inheritdoc /> */
+ public bool Remove(TKey key)
+ {
+ return _dict.Remove(key);
+ }
+
+ /** <inheritdoc /> */
+ public bool TryGetValue(TKey key, out TValue value)
+ {
+ return _dict.TryGetValue(key, out value);
+ }
+
+ /** <inheritdoc /> */
+ public TValue this[TKey key]
+ {
+ get { return _dict[key]; }
+ set { throw GetReadonlyException(); }
+ }
+
+ /** <inheritdoc /> */
+ public ICollection<TKey> Keys
+ {
+ get { return _dict.Keys; }
+ }
+
+ /** <inheritdoc /> */
+ public ICollection<TValue> Values
+ {
+ get { return _dict.Values; }
+ }
+
+ /// <summary>
+ /// Gets the readonly exception.
+ /// </summary>
+ private static Exception GetReadonlyException()
+ {
+ return new NotSupportedException("Dictionary is read-only.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs
new file mode 100644
index 0000000..4e5c396
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/AsyncResult.cs
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+ using Apache.Ignite.Core.Common;
+
+ /// <summary>
+ /// Adapts IGridFuture to the IAsyncResult.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable",
+ Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " +
+ "to the client, and IAsyncResult is not IDisposable.")]
+ public class AsyncResult : IAsyncResult
+ {
+ /** */
+ private readonly ManualResetEvent _waitHandle;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AsyncResult"/> class.
+ /// </summary>
+ /// <param name="fut">The future to wrap.</param>
+ public AsyncResult(IFuture fut)
+ {
+ _waitHandle = new ManualResetEvent(false);
+
+ fut.Listen(() => _waitHandle.Set());
+ }
+
+ /** <inheritdoc /> */
+ public bool IsCompleted
+ {
+ get { return _waitHandle.WaitOne(0); }
+ }
+
+ /** <inheritdoc /> */
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _waitHandle; }
+ }
+
+ /** <inheritdoc /> */
+ public object AsyncState
+ {
+ get { return null; }
+ }
+
+ /** <inheritdoc /> */
+ public bool CompletedSynchronously
+ {
+ get { return false; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs
new file mode 100644
index 0000000..14195fd
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CompletedAsyncResult.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using System.Diagnostics.CodeAnalysis;
+ using System.Threading;
+
+ /// <summary>
+ /// Represents an IAsyncResult that is completed.
+ /// </summary>
+ [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable",
+ Justification = "Implementing IDisposable has no point since we return this class as IAsyncResult " +
+ "to the client, and IAsyncResult is not IDisposable.")]
+ public class CompletedAsyncResult : IAsyncResult
+ {
+ /** Singleton instance. */
+ public static readonly IAsyncResult Instance = new CompletedAsyncResult();
+
+ /** */
+ private readonly WaitHandle _asyncWaitHandle = new ManualResetEvent(true);
+
+ /// <summary>
+ /// Prevents a default instance of the <see cref="CompletedAsyncResult"/> class from being created.
+ /// </summary>
+ private CompletedAsyncResult()
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public bool IsCompleted
+ {
+ get { return true; }
+ }
+
+ /** <inheritdoc /> */
+ public WaitHandle AsyncWaitHandle
+ {
+ get { return _asyncWaitHandle; }
+ }
+
+ /** <inheritdoc /> */
+ public object AsyncState
+ {
+ get { return null; }
+ }
+
+ /** <inheritdoc /> */
+ public bool CompletedSynchronously
+ {
+ get { return false; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CopyOnWriteConcurrentDictionary.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CopyOnWriteConcurrentDictionary.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CopyOnWriteConcurrentDictionary.cs
new file mode 100644
index 0000000..fa785b2
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/CopyOnWriteConcurrentDictionary.cs
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using System.Collections.Generic;
+
+ /// <summary>
+ /// Concurrent dictionary with CopyOnWrite mechanism inside.
+ /// Good for frequent reads / infrequent writes scenarios.
+ /// </summary>
+ public class CopyOnWriteConcurrentDictionary<TKey, TValue>
+ {
+ /** */
+ private volatile Dictionary<TKey, TValue> _dict = new Dictionary<TKey, TValue>();
+
+ /// <summary>
+ /// Gets the value associated with the specified key.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="val">The value.</param>
+ /// <returns>true if the dictionary contains an element with the specified key; otherwise, false.</returns>
+ public bool TryGetValue(TKey key, out TValue val)
+ {
+ return _dict.TryGetValue(key, out val);
+ }
+
+ /// <summary>
+ /// Adds a key/value pair if the key does not already exist.
+ /// </summary>
+ /// <param name="key">The key.</param>
+ /// <param name="valueFactory">The function used to generate a value for the key.</param>
+ /// <returns>The value for the key.</returns>
+ public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
+ {
+ lock (this)
+ {
+ TValue res;
+
+ if (_dict.TryGetValue(key, out res))
+ return res;
+
+ var dict0 = new Dictionary<TKey, TValue>(_dict);
+
+ res = valueFactory(key);
+
+ dict0[key] = res;
+
+ _dict = dict0;
+
+ return res;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
new file mode 100644
index 0000000..7f83588
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateConverter.cs
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using System.Diagnostics;
+ using System.Linq.Expressions;
+ using System.Reflection;
+ using System.Reflection.Emit;
+
+ /// <summary>
+ /// Converts generic and non-generic delegates.
+ /// </summary>
+ public static class DelegateConverter
+ {
+ /** */
+ private const string DefaultMethodName = "Invoke";
+
+ /// <summary>
+ /// Compiles a function without arguments.
+ /// </summary>
+ /// <param name="targetType">Type of the target.</param>
+ /// <returns>Compiled function that calls specified method on specified target.</returns>
+ public static Func<object, object> CompileFunc(Type targetType)
+ {
+ var method = targetType.GetMethod(DefaultMethodName);
+
+ var targetParam = Expression.Parameter(typeof(object));
+ var targetParamConverted = Expression.Convert(targetParam, targetType);
+
+ var callExpr = Expression.Call(targetParamConverted, method);
+ var convertResultExpr = Expression.Convert(callExpr, typeof(object));
+
+ return Expression.Lambda<Func<object, object>>(convertResultExpr, targetParam).Compile();
+ }
+
+ /// <summary>
+ /// Compiles a function with arbitrary number of arguments.
+ /// </summary>
+ /// <typeparam name="T">Resulting delegate type.</typeparam>
+ /// <param name="targetType">Type of the target.</param>
+ /// <param name="argTypes">Argument types.</param>
+ /// <param name="convertToObject">
+ /// Flags that indicate whether func params and/or return value should be converted from/to object.
+ /// </param>
+ /// <param name="methodName">Name of the method.</param>
+ /// <returns>
+ /// Compiled function that calls specified method on specified target.
+ /// </returns>
+ public static T CompileFunc<T>(Type targetType, Type[] argTypes, bool[] convertToObject = null,
+ string methodName = null)
+ where T : class
+ {
+ var method = targetType.GetMethod(methodName ?? DefaultMethodName, argTypes);
+
+ return CompileFunc<T>(targetType, method, argTypes, convertToObject);
+ }
+
+ /// <summary>
+ /// Compiles a function with arbitrary number of arguments.
+ /// </summary>
+ /// <typeparam name="T">Resulting delegate type.</typeparam>
+ /// <param name="method">Method.</param>
+ /// <param name="targetType">Type of the target.</param>
+ /// <param name="argTypes">Argument types.</param>
+ /// <param name="convertToObject">
+ /// Flags that indicate whether func params and/or return value should be converted from/to object.
+ /// </param>
+ /// <returns>
+ /// Compiled function that calls specified method on specified target.
+ /// </returns>
+ public static T CompileFunc<T>(Type targetType, MethodInfo method, Type[] argTypes,
+ bool[] convertToObject = null)
+ where T : class
+ {
+ if (argTypes == null)
+ {
+ var args = method.GetParameters();
+ argTypes = new Type[args.Length];
+
+ for (int i = 0; i < args.Length; i++)
+ argTypes[i] = args[i].ParameterType;
+ }
+
+ Debug.Assert(convertToObject == null || (convertToObject.Length == argTypes.Length + 1));
+ Debug.Assert(method != null);
+
+ targetType = method.IsStatic ? null : (targetType ?? method.DeclaringType);
+
+ var targetParam = Expression.Parameter(typeof(object));
+
+ Expression targetParamConverted = null;
+ ParameterExpression[] argParams;
+ int argParamsOffset = 0;
+
+ if (targetType != null)
+ {
+ targetParamConverted = Expression.Convert(targetParam, targetType);
+ argParams = new ParameterExpression[argTypes.Length + 1];
+ argParams[0] = targetParam;
+ argParamsOffset = 1;
+ }
+ else
+ argParams = new ParameterExpression[argTypes.Length]; // static method
+
+ var argParamsConverted = new Expression[argTypes.Length];
+
+ for (var i = 0; i < argTypes.Length; i++)
+ {
+ if (convertToObject == null || convertToObject[i])
+ {
+ var argParam = Expression.Parameter(typeof (object));
+ argParams[i + argParamsOffset] = argParam;
+ argParamsConverted[i] = Expression.Convert(argParam, argTypes[i]);
+ }
+ else
+ {
+ var argParam = Expression.Parameter(argTypes[i]);
+ argParams[i + argParamsOffset] = argParam;
+ argParamsConverted[i] = argParam;
+ }
+ }
+
+ Expression callExpr = Expression.Call(targetParamConverted, method, argParamsConverted);
+
+ if (convertToObject == null || convertToObject[argTypes.Length])
+ callExpr = Expression.Convert(callExpr, typeof(object));
+
+ return Expression.Lambda<T>(callExpr, argParams).Compile();
+ }
+
+ /// <summary>
+ /// Compiles a generic ctor with arbitrary number of arguments.
+ /// </summary>
+ /// <typeparam name="T">Result func type.</typeparam>
+ /// <param name="type">Type to be created by ctor.</param>
+ /// <param name="argTypes">Argument types.</param>
+ /// <param name="convertResultToObject">if set to <c>true</c> [convert result to object].
+ /// Flag that indicates whether ctor return value should be converted to object.
+ /// </param>
+ /// <returns>
+ /// Compiled generic constructor.
+ /// </returns>
+ public static T CompileCtor<T>(Type type, Type[] argTypes, bool convertResultToObject = true)
+ {
+ var ctor = type.GetConstructor(argTypes);
+
+ Debug.Assert(ctor != null);
+
+ var args = new ParameterExpression[argTypes.Length];
+ var argsConverted = new Expression[argTypes.Length];
+
+ for (var i = 0; i < argTypes.Length; i++)
+ {
+ var arg = Expression.Parameter(typeof(object));
+ args[i] = arg;
+ argsConverted[i] = Expression.Convert(arg, argTypes[i]);
+ }
+
+ Expression ctorExpr = Expression.New(ctor, argsConverted); // ctor takes args of specific types
+
+ if (convertResultToObject)
+ ctorExpr = Expression.Convert(ctorExpr, typeof (object)); // convert ctor result to object
+
+ return Expression.Lambda<T>(ctorExpr, args).Compile(); // lambda takes args as objects
+ }
+
+ /// <summary>
+ /// Compiles the field setter.
+ /// </summary>
+ /// <param name="field">The field.</param>
+ /// <returns>Compiled field setter.</returns>
+ public static Action<object, object> CompileFieldSetter(FieldInfo field)
+ {
+ Debug.Assert(field != null);
+ Debug.Assert(field.DeclaringType != null); // non-static
+
+ var targetParam = Expression.Parameter(typeof(object));
+ var targetParamConverted = Expression.Convert(targetParam, field.DeclaringType);
+
+ var valParam = Expression.Parameter(typeof(object));
+ var valParamConverted = Expression.Convert(valParam, field.FieldType);
+
+ var assignExpr = Expression.Call(GetWriteFieldMethod(field), targetParamConverted, valParamConverted);
+
+ return Expression.Lambda<Action<object, object>>(assignExpr, targetParam, valParam).Compile();
+ }
+
+ /// <summary>
+ /// Compiles the property setter.
+ /// </summary>
+ /// <param name="prop">The property.</param>
+ /// <returns>Compiled property setter.</returns>
+ public static Action<object, object> CompilePropertySetter(PropertyInfo prop)
+ {
+ Debug.Assert(prop != null);
+ Debug.Assert(prop.DeclaringType != null); // non-static
+
+ var targetParam = Expression.Parameter(typeof(object));
+ var targetParamConverted = Expression.Convert(targetParam, prop.DeclaringType);
+
+ var valParam = Expression.Parameter(typeof(object));
+ var valParamConverted = Expression.Convert(valParam, prop.PropertyType);
+
+ var fld = Expression.Property(targetParamConverted, prop);
+
+ var assignExpr = Expression.Assign(fld, valParamConverted);
+
+ return Expression.Lambda<Action<object, object>>(assignExpr, targetParam, valParam).Compile();
+ }
+
+ /// <summary>
+ /// Gets a method to write a field (including private and readonly).
+ /// NOTE: Expression Trees can't write readonly fields.
+ /// </summary>
+ /// <param name="field">The field.</param>
+ /// <returns>Resulting MethodInfo.</returns>
+ public static DynamicMethod GetWriteFieldMethod(FieldInfo field)
+ {
+ Debug.Assert(field != null);
+
+ var module = Assembly.GetExecutingAssembly().GetModules()[0];
+
+ var method = new DynamicMethod(string.Empty, null, new[] { field.DeclaringType, field.FieldType }, module,
+ true);
+
+ var il = method.GetILGenerator();
+
+ il.Emit(OpCodes.Ldarg_0);
+ il.Emit(OpCodes.Ldarg_1);
+ il.Emit(OpCodes.Stfld, field);
+ il.Emit(OpCodes.Ret);
+
+ return method;
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f2eb16cd/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
----------------------------------------------------------------------
diff --git a/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
new file mode 100644
index 0000000..8d7cb3a
--- /dev/null
+++ b/modules/platform/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System;
+ using Apache.Ignite.Core.Cache;
+ using Apache.Ignite.Core.Compute;
+ using Apache.Ignite.Core.Datastream;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl.Cache;
+ using Apache.Ignite.Core.Impl.Datastream;
+ using Apache.Ignite.Core.Impl.Portable.IO;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+ using Apache.Ignite.Core.Messaging;
+
+ /// <summary>
+ /// Type descriptor with precompiled delegates for known methods.
+ /// </summary>
+ internal class DelegateTypeDescriptor
+ {
+ /** Cached decriptors. */
+ private static readonly CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor> Descriptors
+ = new CopyOnWriteConcurrentDictionary<Type, DelegateTypeDescriptor>();
+
+ /** */
+ private readonly Func<object, object> _computeOutFunc;
+
+ /** */
+ private readonly Func<object, object, object> _computeFunc;
+
+ /** */
+ private readonly Func<object, Guid, object, bool> _eventFilter;
+
+ /** */
+ private readonly Func<object, object, object, bool> _cacheEntryFilter;
+
+ /** */
+ private readonly Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
+ _cacheEntryProcessor;
+
+ /** */
+ private readonly Func<object, Guid, object, bool> _messageFilter;
+
+ /** */
+ private readonly Func<object, object> _computeJobExecute;
+
+ /** */
+ private readonly Action<object> _computeJobCancel;
+
+ /** */
+ private readonly Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> _streamReceiver;
+
+ /** */
+ private readonly Func<object, object> _streamTransformerCtor;
+
+ /// <summary>
+ /// Gets the <see cref="IComputeFunc{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object> GetComputeOutFunc(Type type)
+ {
+ return Get(type)._computeOutFunc;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IComputeFunc{T, R}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object, object> GetComputeFunc(Type type)
+ {
+ return Get(type)._computeFunc;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IEventFilter{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, Guid, object, bool> GetEventFilter(Type type)
+ {
+ return Get(type)._eventFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="ICacheEntryFilter{TK,TV}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object, object, bool> GetCacheEntryFilter(Type type)
+ {
+ return Get(type)._cacheEntryFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="ICacheEntryProcessor{K, V, A, R}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, IMutableCacheEntryInternal, object, object> GetCacheEntryProcessor(Type type)
+ {
+ return Get(type)._cacheEntryProcessor.Item1;
+ }
+
+ /// <summary>
+ /// Gets key and value types for the <see cref="ICacheEntryProcessor{K, V, A, R}" />.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Key and value types.</returns>
+ public static Tuple<Type, Type> GetCacheEntryProcessorTypes(Type type)
+ {
+ return Get(type)._cacheEntryProcessor.Item2;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IMessageFilter{T}" /> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, Guid, object, bool> GetMessageFilter(Type type)
+ {
+ return Get(type)._messageFilter;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IComputeJob{T}.Execute" /> and <see cref="IComputeJob{T}.Cancel" /> invocators.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <param name="execute">Execute invocator.</param>
+ /// <param name="cancel">Cancel invocator.</param>
+ public static void GetComputeJob(Type type, out Func<object, object> execute, out Action<object> cancel)
+ {
+ var desc = Get(type);
+
+ execute = desc._computeJobExecute;
+ cancel = desc._computeJobCancel;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="IStreamReceiver{TK,TV}"/> invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool> GetStreamReceiver(Type type)
+ {
+ return Get(type)._streamReceiver;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="StreamTransformer{K, V, A, R}"/>> ctor invocator.
+ /// </summary>
+ /// <param name="type">Type.</param>
+ /// <returns>Precompiled invocator delegate.</returns>
+ public static Func<object, object> GetStreamTransformerCtor(Type type)
+ {
+ return Get(type)._streamTransformerCtor;
+ }
+
+ /// <summary>
+ /// Gets the <see cref="DelegateTypeDescriptor" /> by type.
+ /// </summary>
+ private static DelegateTypeDescriptor Get(Type type)
+ {
+ DelegateTypeDescriptor result;
+
+ return Descriptors.TryGetValue(type, out result)
+ ? result
+ : Descriptors.GetOrAdd(type, t => new DelegateTypeDescriptor(t));
+ }
+
+ /// <summary>
+ /// Throws an exception if first argument is not null.
+ /// </summary>
+ // ReSharper disable once UnusedParameter.Local
+ private static void ThrowIfMultipleInterfaces(object check, Type userType, Type interfaceType)
+ {
+ if (check != null)
+ throw new InvalidOperationException(
+ string.Format("Not Supported: Type {0} implements interface {1} multiple times.", userType,
+ interfaceType));
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="DelegateTypeDescriptor"/> class.
+ /// </summary>
+ /// <param name="type">The type.</param>
+ private DelegateTypeDescriptor(Type type)
+ {
+ foreach (var iface in type.GetInterfaces())
+ {
+ if (!iface.IsGenericType)
+ continue;
+
+ var genericTypeDefinition = iface.GetGenericTypeDefinition();
+
+ if (genericTypeDefinition == typeof (IComputeFunc<>))
+ {
+ ThrowIfMultipleInterfaces(_computeOutFunc, type, typeof(IComputeFunc<>));
+
+ _computeOutFunc = DelegateConverter.CompileFunc(iface);
+ }
+ else if (genericTypeDefinition == typeof (IComputeFunc<,>))
+ {
+ ThrowIfMultipleInterfaces(_computeFunc, type, typeof(IComputeFunc<,>));
+
+ var args = iface.GetGenericArguments();
+
+ _computeFunc = DelegateConverter.CompileFunc<Func<object, object, object>>(iface, new[] {args[0]});
+ }
+ else if (genericTypeDefinition == typeof (IEventFilter<>))
+ {
+ ThrowIfMultipleInterfaces(_eventFilter, type, typeof(IEventFilter<>));
+
+ var args = iface.GetGenericArguments();
+
+ _eventFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ new[] {typeof (Guid), args[0]}, new[] {false, true, false});
+ }
+ else if (genericTypeDefinition == typeof (ICacheEntryFilter<,>))
+ {
+ ThrowIfMultipleInterfaces(_cacheEntryFilter, type, typeof(ICacheEntryFilter<,>));
+
+ var args = iface.GetGenericArguments();
+
+ var entryType = typeof (ICacheEntry<,>).MakeGenericType(args);
+
+ var invokeFunc = DelegateConverter.CompileFunc<Func<object, object, bool>>(iface,
+ new[] { entryType }, new[] { true, false });
+
+ var ctor = DelegateConverter.CompileCtor<Func<object, object, object>>(
+ typeof (CacheEntry<,>).MakeGenericType(args), args);
+
+ // Resulting func constructs CacheEntry and passes it to user implementation
+ _cacheEntryFilter = (obj, k, v) => invokeFunc(obj, ctor(k, v));
+ }
+ else if (genericTypeDefinition == typeof (ICacheEntryProcessor<,,,>))
+ {
+ ThrowIfMultipleInterfaces(_cacheEntryProcessor, type, typeof(ICacheEntryProcessor<,,,>));
+
+ var args = iface.GetGenericArguments();
+
+ var entryType = typeof (IMutableCacheEntry<,>).MakeGenericType(args[0], args[1]);
+
+ var func = DelegateConverter.CompileFunc<Func<object, object, object, object>>(iface,
+ new[] { entryType, args[2] }, null, "Process");
+
+ var types = new Tuple<Type, Type>(args[0], args[1]);
+
+ _cacheEntryProcessor = new Tuple<Func<object, IMutableCacheEntryInternal, object, object>, Tuple<Type, Type>>
+ (func, types);
+
+ var transformerType = typeof (StreamTransformer<,,,>).MakeGenericType(args);
+
+ _streamTransformerCtor = DelegateConverter.CompileCtor<Func<object, object>>(transformerType,
+ new[] {iface});
+ }
+ else if (genericTypeDefinition == typeof (IMessageFilter<>))
+ {
+ ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IMessageFilter<>));
+
+ var arg = iface.GetGenericArguments()[0];
+
+ _messageFilter = DelegateConverter.CompileFunc<Func<object, Guid, object, bool>>(iface,
+ new[] { typeof(Guid), arg }, new[] { false, true, false });
+ }
+ else if (genericTypeDefinition == typeof (IComputeJob<>))
+ {
+ ThrowIfMultipleInterfaces(_messageFilter, type, typeof(IComputeJob<>));
+
+ _computeJobExecute = DelegateConverter.CompileFunc<Func<object, object>>(iface, new Type[0],
+ methodName: "Execute");
+
+ _computeJobCancel = DelegateConverter.CompileFunc<Action<object>>(iface, new Type[0],
+ new[] {false}, "Cancel");
+ }
+ else if (genericTypeDefinition == typeof (IStreamReceiver<,>))
+ {
+ ThrowIfMultipleInterfaces(_streamReceiver, type, typeof (IStreamReceiver<,>));
+
+ var method =
+ typeof (StreamReceiverHolder).GetMethod("InvokeReceiver")
+ .MakeGenericMethod(iface.GetGenericArguments());
+
+ _streamReceiver = DelegateConverter
+ .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IPortableStream, bool>>(
+ typeof (StreamReceiverHolder),
+ method,
+ new[]
+ {
+ iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IPortableStream),
+ typeof (bool)
+ },
+ new[] {true, false, false, false, false, false});
+ }
+ }
+ }
+ }
+}
\ No newline at end of file