You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/24 14:04:40 UTC
[17/50] [abbrv] ignite git commit: GG-11293: .NET: Backported
affinity functions feature to 7.5.30.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
new file mode 100644
index 0000000..6067af4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Events;
+ using Apache.Ignite.Core.Impl;
+
+ /// <summary>
+ /// Affinity function context.
+ /// </summary>
+ public class AffinityFunctionContext
+ {
+ /** */
+ private readonly List<List<IClusterNode>> _previousAssignment;
+
+ /** */
+ private readonly int _backups;
+
+ /** */
+ private readonly ICollection<IClusterNode> _currentTopologySnapshot;
+
+ /** */
+ private readonly AffinityTopologyVersion _currentTopologyVersion;
+
+ /** */
+ private readonly DiscoveryEvent _discoveryEvent;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityFunctionContext"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ internal AffinityFunctionContext(IBinaryRawReader reader)
+ {
+ Debug.Assert(reader != null);
+
+ _currentTopologySnapshot = IgniteUtils.ReadNodes(reader);
+ _backups = reader.ReadInt();
+ _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt());
+ _discoveryEvent = EventReader.Read<DiscoveryEvent>(reader);
+
+ // Prev assignment
+ var cnt = reader.ReadInt();
+
+ if (cnt > 0)
+ {
+ _previousAssignment = new List<List<IClusterNode>>(cnt);
+
+ for (var i = 0; i < cnt; i++)
+ _previousAssignment.Add(IgniteUtils.ReadNodes(reader));
+ }
+ }
+
+ /// <summary>
+ /// Gets the affinity assignment for given partition on previous topology version.
+ /// First node in returned list is a primary node, other nodes are backups.
+ /// </summary>
+ /// <param name="partition">The partition to get previous assignment for.</param>
+ /// <returns>
+ /// List of nodes assigned to a given partition on previous topology version or <code>null</code>
+ /// if this information is not available.
+ /// </returns>
+ public ICollection<IClusterNode> GetPreviousAssignment(int partition)
+ {
+ return _previousAssignment == null ? null : _previousAssignment[partition];
+ }
+
+ /// <summary>
+ /// Gets number of backups for new assignment.
+ /// </summary>
+ public int Backups
+ {
+ get { return _backups; }
+ }
+
+ /// <summary>
+ /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular
+ /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order
+ /// on all nodes on which partition assignment is performed.
+ /// </summary>
+ public ICollection<IClusterNode> CurrentTopologySnapshot
+ {
+ get { return _currentTopologySnapshot; }
+ }
+
+ /// <summary>
+ /// Gets the current topology version.
+ /// </summary>
+ public AffinityTopologyVersion CurrentTopologyVersion
+ {
+ get { return _currentTopologyVersion; }
+ }
+
+ /// <summary>
+ /// Gets the discovery event that caused the topology change.
+ /// </summary>
+ public DiscoveryEvent DiscoveryEvent
+ {
+ get { return _discoveryEvent; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
new file mode 100644
index 0000000..9bfdfb4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+ using System;
+ using Apache.Ignite.Core.Cluster;
+
+ /// <summary>
+ /// Affinity topology version.
+ /// </summary>
+ public struct AffinityTopologyVersion : IEquatable<AffinityTopologyVersion>
+ {
+ /** */
+ private readonly long _version;
+
+ /** */
+ private readonly int _minorVersion;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityTopologyVersion"/> struct.
+ /// </summary>
+ /// <param name="version">The version.</param>
+ /// <param name="minorVersion">The minor version.</param>
+ public AffinityTopologyVersion(long version, int minorVersion)
+ {
+ _version = version;
+ _minorVersion = minorVersion;
+ }
+
+ /// <summary>
+ /// Gets the major version, same as <see cref="ICluster.TopologyVersion"/>.
+ /// </summary>
+ public long Version
+ {
+ get { return _version; }
+ }
+
+ /// <summary>
+ /// Gets the minor version, which is increased when new caches start.
+ /// </summary>
+ public int MinorVersion
+ {
+ get { return _minorVersion; }
+ }
+
+ /// <summary>
+ /// Indicates whether the current object is equal to another object of the same type.
+ /// </summary>
+ /// <param name="other">An object to compare with this object.</param>
+ /// <returns>
+ /// true if the current object is equal to the <paramref name="other" /> parameter; otherwise, false.
+ /// </returns>
+ public bool Equals(AffinityTopologyVersion other)
+ {
+ return _version == other._version && _minorVersion == other._minorVersion;
+ }
+
+ /// <summary>
+ /// Determines whether the specified <see cref="System.Object" />, is equal to this instance.
+ /// </summary>
+ /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param>
+ /// <returns>
+ /// <c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise,
+ /// <c>false</c>.
+ /// </returns>
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ return obj is AffinityTopologyVersion && Equals((AffinityTopologyVersion) obj);
+ }
+
+ /// <summary>
+ /// Returns a hash code for this instance.
+ /// </summary>
+ /// <returns>
+ /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table.
+ /// </returns>
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (_version.GetHashCode()*397) ^ _minorVersion;
+ }
+ }
+
+ /// <summary>
+ /// Implements the operator ==.
+ /// </summary>
+ /// <param name="left">The left.</param>
+ /// <param name="right">The right.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator ==(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return left.Equals(right);
+ }
+
+ /// <summary>
+ /// Implements the operator !=.
+ /// </summary>
+ /// <param name="left">The left.</param>
+ /// <param name="right">The right.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator !=(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return !left.Equals(right);
+ }
+
+ /// <summary>
+ /// Returns a <see cref="string" /> that represents this instance.
+ /// </summary>
+ /// <returns>
+ /// A <see cref="string" /> that represents this instance.
+ /// </returns>
+ public override string ToString()
+ {
+ return string.Format("AffinityTopologyVersion [Version={0}, MinorVersion={1}]", _version, _minorVersion);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
new file mode 100644
index 0000000..4a3885f
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity.Fair
+{
+ using System;
+
+ /// <summary>
+ /// Fair affinity function which tries to ensure that all nodes get equal number of partitions with
+ /// minimum amount of reassignments between existing nodes.
+ /// </summary>
+ [Serializable]
+ public class FairAffinityFunction : AffinityFunctionBase
+ {
+ // No-op.
+ // Actual implementation is in Java, see AffinityFunctionSerializer.Write method.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
new file mode 100644
index 0000000..b6c190c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/IAffinityFunction.cs
@@ -0,0 +1,82 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cache.Affinity.Fair;
+ using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+ using Apache.Ignite.Core.Cluster;
+
+ /// <summary>
+ /// Represents a function that maps cache keys to cluster nodes.
+ /// <para />
+ /// Predefined implementations:
+ /// <see cref="RendezvousAffinityFunction"/>, <see cref="FairAffinityFunction"/>.
+ /// </summary>
+ public interface IAffinityFunction
+ {
+ /// <summary>
+ /// Gets the total number of partitions.
+ /// <para />
+ /// All caches should always provide correct partition count which should be the same on all
+ /// participating nodes. Note that partitions should always be numbered from 0 inclusively
+ /// to N exclusively without any gaps.
+ /// </summary>
+ int Partitions { get; }
+
+ /// <summary>
+ /// Gets partition number for a given key starting from 0. Partitioned caches
+ /// should make sure that keys are about evenly distributed across all partitions
+ /// from 0 to <see cref="Partitions"/> for best performance.
+ /// <para />
+ /// Note that for fully replicated caches it is possible to segment key sets among different
+ /// grid node groups. In that case each node group should return a unique partition
+ /// number. However, unlike partitioned cache, mappings of keys to nodes in
+ /// replicated caches are constant and a node cannot migrate from one partition
+ /// to another.
+ /// </summary>
+ /// <param name="key">Key to get partition for.</param>
+ /// <returns>Partition number for a given key.</returns>
+ int GetPartition(object key);
+
+ /// <summary>
+ /// Removes node from affinity. This method is called when it is safe to remove
+ /// disconnected node from affinity mapping.
+ /// </summary>
+ /// <param name="nodeId">The node identifier.</param>
+ void RemoveNode(Guid nodeId);
+
+ /// <summary>
+ /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+ /// nodes are updated in the same manner. In case of partitioned cache, the returned
+ /// list should contain only the primary and back up nodes with primary node being
+ /// always first.
+ /// <pare />
+ /// Note that partitioned affinity must obey the following contract: given that node
+ /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+ /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+ /// </summary>
+ /// <param name="context">The affinity function context.</param>
+ /// <returns>
+ /// A collection of partitions, where each partition is a collection of nodes,
+ /// where first node is a primary node, and other nodes are backup nodes.
+ /// </returns>
+ IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
new file mode 100644
index 0000000..98ec364
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous
+{
+ using System;
+
+ /// <summary>
+ /// Affinity function for partitioned cache based on Highest Random Weight algorithm.
+ /// </summary>
+ [Serializable]
+ public class RendezvousAffinityFunction : AffinityFunctionBase
+ {
+ // No-op.
+ // Actual implementation is in Java, see AffinityFunctionSerializer.Write method.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
index cb1c715..ee1c837 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Events/EventReader.cs
@@ -32,16 +32,14 @@ namespace Apache.Ignite.Core.Events
/// <param name="reader">Reader.</param>
/// <returns>Deserialized event.</returns>
/// <exception cref="System.InvalidCastException">Incompatible event type.</exception>
- public static T Read<T>(IBinaryReader reader) where T : IEvent
+ public static T Read<T>(IBinaryRawReader reader) where T : IEvent
{
- var r = reader.GetRawReader();
-
- var clsId = r.ReadInt();
+ var clsId = reader.ReadInt();
if (clsId == -1)
return default(T);
- return (T) CreateInstance(clsId, r);
+ return (T) CreateInstance(clsId, reader);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
index 7a3fafc..f6fab5d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs
@@ -26,10 +26,12 @@ namespace Apache.Ignite.Core
using System.Runtime;
using System.Threading;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Cache.Affinity;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Impl.Handle;
using Apache.Ignite.Core.Impl.Memory;
@@ -233,6 +235,8 @@ namespace Apache.Ignite.Core
PrepareConfiguration(reader);
PrepareLifecycleBeans(reader, outStream, handleRegistry);
+
+ PrepareAffinityFunctions(reader, outStream);
}
catch (Exception e)
{
@@ -282,7 +286,7 @@ namespace Apache.Ignite.Core
int cnt = reader.ReadInt();
for (int i = 0; i < cnt; i++)
- beans.Add(new LifecycleBeanHolder(CreateLifecycleBean(reader)));
+ beans.Add(new LifecycleBeanHolder(CreateObject<ILifecycleBean>(reader)));
// 2. Append beans definied in local configuration.
ICollection<ILifecycleBean> nativeBeans = _startup.Configuration.LifecycleBeans;
@@ -306,21 +310,33 @@ namespace Apache.Ignite.Core
}
/// <summary>
- /// Create lifecycle bean.
+ /// Prepares the affinity functions.
/// </summary>
- /// <param name="reader">Reader.</param>
- /// <returns>Lifecycle bean.</returns>
- private static ILifecycleBean CreateLifecycleBean(BinaryReader reader)
+ private static void PrepareAffinityFunctions(BinaryReader reader, PlatformMemoryStream outStream)
{
- // 1. Instantiate.
- var bean = IgniteUtils.CreateInstance<ILifecycleBean>(reader.ReadString());
+ var cnt = reader.ReadInt();
+
+ var writer = reader.Marshaller.StartMarshal(outStream);
- // 2. Set properties.
- var props = reader.ReadDictionaryAsGeneric<string, object>();
+ for (var i = 0; i < cnt; i++)
+ {
+ var objHolder = new ObjectInfoHolder(reader);
+ AffinityFunctionSerializer.Write(writer, objHolder.CreateInstance<IAffinityFunction>(), objHolder);
+ }
+ }
+
+ /// <summary>
+ /// Creates an object and sets the properties.
+ /// </summary>
+ /// <param name="reader">Reader.</param>
+ /// <returns>Resulting object.</returns>
+ private static T CreateObject<T>(IBinaryRawReader reader)
+ {
+ var res = IgniteUtils.CreateInstance<T>(reader.ReadString());
- IgniteUtils.SetProperties(bean, props);
+ IgniteUtils.SetProperties(res, reader.ReadDictionaryAsGeneric<string, object>());
- return bean;
+ return res;
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
index c3dcc3a..a5446ac 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Impl.Binary
{
using System.Collections.Generic;
using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Impl.Common;
/// <summary>
/// Reader extensions.
@@ -48,5 +49,18 @@ namespace Apache.Ignite.Core.Impl.Binary
{
return (Dictionary<TKey, TValue>) reader.ReadDictionary(size => new Dictionary<TKey, TValue>(size));
}
+
+ /// <summary>
+ /// Reads the object either as a normal object or as a [typeName+props] wrapper.
+ /// </summary>
+ public static T ReadObjectEx<T>(this IBinaryRawReader reader)
+ {
+ var obj = reader.ReadObject<object>();
+
+ if (obj == null)
+ return default(T);
+
+ return obj is T ? (T)obj : ((ObjectInfoHolder)obj).CreateInstance<T>();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
index 81fc195..a63e8f4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs
@@ -506,12 +506,15 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <summary>
/// Adds a predefined system type.
/// </summary>
- private void AddSystemType<T>(byte typeId, Func<BinaryReader, T> ctor) where T : IBinaryWriteAware
+ private void AddSystemType<T>(int typeId, Func<BinaryReader, T> ctor) where T : IBinaryWriteAware
{
var type = typeof(T);
var serializer = new BinarySystemTypeSerializer<T>(ctor);
+ if (typeId == 0)
+ typeId = BinaryUtils.TypeId(type.Name, null, null);
+
AddType(type, typeId, BinaryUtils.GetTypeName(type), false, false, null, null, serializer, null, false);
}
@@ -536,6 +539,7 @@ namespace Apache.Ignite.Core.Impl.Binary
AddSystemType(BinaryUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w));
AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w));
AddSystemType(BinaryUtils.TypePlatformJavaObjectFactoryProxy, w => new PlatformJavaObjectFactoryProxy());
+ AddSystemType(0, w => new ObjectInfoHolder(w));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
new file mode 100644
index 0000000..888445a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/AffinityFunctionSerializer.cs
@@ -0,0 +1,277 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.Linq;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Cache.Affinity;
+ using Apache.Ignite.Core.Cache.Affinity.Fair;
+ using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Memory;
+
+ /// <summary>
+ /// Affinity function read/write methods.
+ /// </summary>
+ internal static class AffinityFunctionSerializer
+ {
+ /** */
+ private const byte TypeCodeNull = 0;
+
+ /** */
+ private const byte TypeCodeFair = 1;
+
+ /** */
+ private const byte TypeCodeRendezvous = 2;
+
+ /** */
+ private const byte TypeCodeUser = 3;
+
+ /// <summary>
+ /// Writes the instance.
+ /// </summary>
+ internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun, object userFuncOverride = null)
+ {
+ Debug.Assert(writer != null);
+
+ if (fun == null)
+ {
+ writer.WriteByte(TypeCodeNull);
+ return;
+ }
+
+ // 1) Type code
+ // 2) Partitions
+ // 3) ExcludeNeighbors
+ // 4) Override flags
+ // 5) User object
+
+ var p = fun as AffinityFunctionBase;
+
+ if (p != null)
+ {
+ writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
+ writer.WriteInt(p.Partitions);
+ writer.WriteBoolean(p.ExcludeNeighbors);
+
+ var overrideFlags = GetOverrideFlags(p.GetType());
+ writer.WriteByte((byte) overrideFlags);
+
+ // Do not write user func if there is nothing overridden
+ WriteUserFunc(writer, overrideFlags != UserOverrides.None ? fun : null, userFuncOverride);
+ }
+ else
+ {
+ writer.WriteByte(TypeCodeUser);
+ writer.WriteInt(fun.Partitions);
+ writer.WriteBoolean(false); // Exclude neighbors
+ writer.WriteByte((byte) UserOverrides.All);
+ WriteUserFunc(writer, fun, userFuncOverride);
+ }
+ }
+
+ /// <summary>
+ /// Reads the instance.
+ /// </summary>
+ internal static IAffinityFunction Read(IBinaryRawReader reader)
+ {
+ Debug.Assert(reader != null);
+
+ var typeCode = reader.ReadByte();
+
+ if (typeCode == TypeCodeNull)
+ return null;
+
+ var partitions = reader.ReadInt();
+ var exclNeighbors = reader.ReadBoolean();
+ var overrideFlags = (UserOverrides)reader.ReadByte();
+ var userFunc = reader.ReadObjectEx<IAffinityFunction>();
+
+ if (userFunc != null)
+ {
+ Debug.Assert(overrideFlags != UserOverrides.None);
+
+ var fair = userFunc as FairAffinityFunction;
+ if (fair != null)
+ {
+ fair.Partitions = partitions;
+ fair.ExcludeNeighbors = exclNeighbors;
+ }
+
+ var rendezvous = userFunc as RendezvousAffinityFunction;
+ if (rendezvous != null)
+ {
+ rendezvous.Partitions = partitions;
+ rendezvous.ExcludeNeighbors = exclNeighbors;
+ }
+
+ return userFunc;
+ }
+
+ Debug.Assert(overrideFlags == UserOverrides.None);
+ AffinityFunctionBase fun;
+
+ switch (typeCode)
+ {
+ case TypeCodeFair:
+ fun = new FairAffinityFunction();
+ break;
+ case TypeCodeRendezvous:
+ fun = new RendezvousAffinityFunction();
+ break;
+ default:
+ throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode);
+ }
+
+ fun.Partitions = partitions;
+ fun.ExcludeNeighbors = exclNeighbors;
+
+ return fun;
+ }
+
+
+ /// <summary>
+ /// Writes the partitions assignment to a stream.
+ /// </summary>
+ /// <param name="parts">The parts.</param>
+ /// <param name="stream">The stream.</param>
+ /// <param name="marsh">The marshaller.</param>
+ internal static void WritePartitions(IEnumerable<IEnumerable<IClusterNode>> parts,
+ PlatformMemoryStream stream, Marshaller marsh)
+ {
+ Debug.Assert(parts != null);
+ Debug.Assert(stream != null);
+ Debug.Assert(marsh != null);
+
+ IBinaryRawWriter writer = marsh.StartMarshal(stream);
+
+ var partCnt = 0;
+ writer.WriteInt(partCnt); // reserve size
+
+ foreach (var part in parts)
+ {
+ if (part == null)
+ throw new IgniteException("IAffinityFunction.AssignPartitions() returned invalid partition: null");
+
+ partCnt++;
+
+ var nodeCnt = 0;
+ var cntPos = stream.Position;
+ writer.WriteInt(nodeCnt); // reserve size
+
+ foreach (var node in part)
+ {
+ nodeCnt++;
+ writer.WriteGuid(node.Id);
+ }
+
+ var endPos = stream.Position;
+ stream.Seek(cntPos, SeekOrigin.Begin);
+ stream.WriteInt(nodeCnt);
+ stream.Seek(endPos, SeekOrigin.Begin);
+ }
+
+ stream.SynchronizeOutput();
+ stream.Seek(0, SeekOrigin.Begin);
+ writer.WriteInt(partCnt);
+ }
+
+ /// <summary>
+ /// Reads the partitions assignment from a stream.
+ /// </summary>
+ /// <param name="stream">The stream.</param>
+ /// <param name="marsh">The marshaller.</param>
+ /// <returns>Partitions assignment.</returns>
+ internal static IEnumerable<IEnumerable<IClusterNode>> ReadPartitions(IBinaryStream stream, Marshaller marsh)
+ {
+ Debug.Assert(stream != null);
+ Debug.Assert(marsh != null);
+
+ IBinaryRawReader reader = marsh.StartUnmarshal(stream);
+
+ var partCnt = reader.ReadInt();
+
+ var res = new List<IEnumerable<IClusterNode>>(partCnt);
+
+ for (var i = 0; i < partCnt; i++)
+ res.Add(IgniteUtils.ReadNodes(reader));
+
+ return res;
+ }
+
+ /// <summary>
+ /// Gets the override flags.
+ /// </summary>
+ private static UserOverrides GetOverrideFlags(Type funcType)
+ {
+ var res = UserOverrides.None;
+
+ var methods = new[] {UserOverrides.GetPartition, UserOverrides.AssignPartitions, UserOverrides.RemoveNode};
+
+ var map = funcType.GetInterfaceMap(typeof(IAffinityFunction));
+
+ foreach (var method in methods)
+ {
+ // Find whether user type overrides IAffinityFunction method from AffinityFunctionBase.
+ var methodName = method.ToString();
+
+ if (map.TargetMethods.Single(x => x.Name == methodName).DeclaringType != typeof(AffinityFunctionBase))
+ res |= method;
+ }
+
+ return res;
+ }
+
+ /// <summary>
+ /// Writes the user function.
+ /// </summary>
+ private static void WriteUserFunc(IBinaryRawWriter writer, IAffinityFunction func, object funcOverride)
+ {
+ if (funcOverride != null)
+ {
+ writer.WriteObject(funcOverride);
+ return;
+ }
+
+ if (func != null && !func.GetType().IsSerializable)
+ throw new IgniteException("AffinityFunction should be serializable.");
+
+ writer.WriteObject(func);
+ }
+
+ /// <summary>
+ /// Overridden function flags.
+ /// </summary>
+ [Flags]
+ private enum UserOverrides : byte
+ {
+ None = 0,
+ GetPartition = 1,
+ RemoveNode = 1 << 1,
+ AssignPartitions = 1 << 2,
+ All = GetPartition | RemoveNode | AssignPartitions
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
new file mode 100644
index 0000000..d335804
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs
@@ -0,0 +1,74 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Cache.Affinity
+{
+ using System;
+ using System.Collections.Generic;
+ using Apache.Ignite.Core.Cache.Affinity;
+ using Apache.Ignite.Core.Cluster;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Unmanaged;
+
+ /// <summary>
+ /// Affinity function that delegates to Java.
+ /// </summary>
+ internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction
+ {
+ /** Opcodes. */
+ private enum Op
+ {
+ Partition = 1,
+ RemoveNode = 2,
+ AssignPartitions = 3
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class.
+ /// </summary>
+ /// <param name="target">Target.</param>
+ /// <param name="marsh">Marshaller.</param>
+ public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh)
+ {
+ // No-op.
+ }
+
+ /** <inheritdoc /> */
+ public int Partitions
+ {
+ get { throw new NotSupportedException("PlatformAffinityFunction.Partitions is not supported."); }
+ }
+
+ /** <inheritdoc /> */
+ public int GetPartition(object key)
+ {
+ return (int) DoOutOp((int) Op.Partition, w => w.WriteObject(key));
+ }
+
+ /** <inheritdoc /> */
+ public void RemoveNode(Guid nodeId)
+ {
+ DoOutOp((int) Op.RemoveNode, w => w.WriteGuid(nodeId));
+ }
+
+ /** <inheritdoc /> */
+ public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+ {
+ return DoInOp((int) Op.AssignPartitions, s => AffinityFunctionSerializer.ReadPartitions(s, Marshaller));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
new file mode 100644
index 0000000..407fe0c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/ObjectInfoHolder.cs
@@ -0,0 +1,86 @@
+\ufeff/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Common
+{
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Binary;
+ using Apache.Ignite.Core.Impl.Binary;
+
+ /// <summary>
+ /// Holds the information to instantiate an object and set its properties.
+ /// Typically used for .NET objects defined in Spring XML.
+ /// </summary>
+ internal class ObjectInfoHolder : IBinaryWriteAware
+ {
+ /** Type name. */
+ private readonly string _typeName;
+
+ /** Properties. */
+ private readonly Dictionary<string, object> _properties;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ObjectInfoHolder"/> class.
+ /// </summary>
+ /// <param name="reader">The reader.</param>
+ public ObjectInfoHolder(IBinaryRawReader reader)
+ {
+ Debug.Assert(reader != null);
+
+ _typeName = reader.ReadString();
+ _properties = reader.ReadDictionaryAsGeneric<string, object>();
+
+ Debug.Assert(!string.IsNullOrEmpty(_typeName));
+ }
+
+ /// <summary>
+ /// Gets the name of the type.
+ /// </summary>
+ public string TypeName
+ {
+ get { return _typeName; }
+ }
+
+ /// <summary>
+ /// Gets the properties.
+ /// </summary>
+ public Dictionary<string, object> Properties
+ {
+ get { return _properties; }
+ }
+
+ /// <summary>
+ /// Creates an instance according to type name and properties.
+ /// </summary>
+ public T CreateInstance<T>()
+ {
+ return IgniteUtils.CreateInstance<T>(TypeName, Properties);
+ }
+
+ /** <inheritdoc /> */
+ public void WriteBinary(IBinaryWriter writer)
+ {
+ Debug.Assert(writer != null);
+
+ var w = writer.GetRawWriter();
+
+ w.WriteString(_typeName);
+ w.WriteDictionary(_properties);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
index 7929a5d..5d0d989 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteUtils.cs
@@ -128,8 +128,9 @@ namespace Apache.Ignite.Core.Impl
/// Create new instance of specified class.
/// </summary>
/// <param name="typeName">Class name</param>
+ /// <param name="props">Properties to set.</param>
/// <returns>New Instance.</returns>
- public static T CreateInstance<T>(string typeName)
+ public static T CreateInstance<T>(string typeName, IEnumerable<KeyValuePair<string, object>> props = null)
{
IgniteArgumentCheck.NotNullOrEmpty(typeName, "typeName");
@@ -138,7 +139,12 @@ namespace Apache.Ignite.Core.Impl
if (type == null)
throw new IgniteException("Failed to create class instance [className=" + typeName + ']');
- return (T) Activator.CreateInstance(type);
+ var res = (T)Activator.CreateInstance(type);
+
+ if (props != null)
+ SetProperties(res, props);
+
+ return res;
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
index 8147e9d..dd16d03 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbackHandlers.cs
@@ -95,5 +95,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
internal void* extensionCbInLongOutLong;
internal void* extensionCbInLongLongOutLong;
+
+ internal void* affinityFunctionInit;
+ internal void* affinityFunctionPartition;
+ internal void* affinityFunctionAssignPartitions;
+ internal void* affinityFunctionRemoveNode;
+ internal void* affinityFunctionDestroy;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index e554cfc..f4b3db9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -21,14 +21,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
+ using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
-
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
using Apache.Ignite.Core.Impl.Cache;
+ using Apache.Ignite.Core.Impl.Cache.Affinity;
using Apache.Ignite.Core.Impl.Cache.Query.Continuous;
using Apache.Ignite.Core.Impl.Cache.Store;
using Apache.Ignite.Core.Impl.Common;
@@ -161,6 +163,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1);
private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2);
+ private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc);
+ private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr);
+ private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr);
+ private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr);
+ private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr);
+
/// <summary>
/// constructor.
/// </summary>
@@ -240,7 +248,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
error = CreateFunctionPointer((ErrorCallbackDelegate)Error),
extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong),
- extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong)
+ extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong),
+
+ affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit),
+ affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition),
+ affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions),
+ affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode),
+ affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy)
};
_cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
@@ -1057,7 +1071,120 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
#endregion
-
+
+ #region AffinityFunction
+
+ private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc)
+ {
+ return SafeCall(() =>
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
+
+ var func = reader.ReadObjectEx<IAffinityFunction>();
+
+ ResourceProcessor.Inject(func, _ignite);
+
+ var affBase = func as AffinityFunctionBase;
+
+ if (affBase != null)
+ affBase.SetBaseFunction(new PlatformAffinityFunction(
+ _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller));
+
+ return _handleRegistry.Allocate(func);
+ }
+ });
+ }
+
+ private int AffinityFunctionPartition(void* target, long ptr, long memPtr)
+ {
+ return SafeCall(() =>
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ var key = _ignite.Marshaller.Unmarshal<object>(stream);
+
+ return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key);
+ }
+ });
+ }
+
+ private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr)
+ {
+ SafeCall(() =>
+ {
+ using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream())
+ {
+ var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream));
+ var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
+ var parts = func.AssignPartitions(ctx);
+
+ if (parts == null)
+ throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null");
+
+ using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream())
+ {
+ var writer = _ignite.Marshaller.StartMarshal(outStream);
+
+ var partCnt = 0;
+ writer.WriteInt(partCnt); // reserve size
+
+ foreach (var part in parts)
+ {
+ if (part == null)
+ throw new IgniteException(func.GetType() +
+ ".AssignPartitions() returned invalid partition: null");
+
+ partCnt++;
+
+ var nodeCnt = 0;
+ var cntPos = outStream.Position;
+ writer.WriteInt(nodeCnt); // reserve size
+
+ foreach (var node in part)
+ {
+ nodeCnt++;
+ writer.WriteGuid(node.Id);
+ }
+
+ var endPos = outStream.Position;
+ outStream.Seek(cntPos, SeekOrigin.Begin);
+ outStream.WriteInt(nodeCnt);
+ outStream.Seek(endPos, SeekOrigin.Begin);
+ }
+
+ outStream.SynchronizeOutput();
+ outStream.Seek(0, SeekOrigin.Begin);
+ writer.WriteInt(partCnt);
+ }
+ }
+ });
+ }
+
+ private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr)
+ {
+ SafeCall(() =>
+ {
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
+
+ _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId);
+ }
+ });
+ }
+
+ private void AffinityFunctionDestroy(void* target, long ptr)
+ {
+ SafeCall(() =>
+ {
+ _handleRegistry.Release(ptr);
+ });
+ }
+
+ #endregion
+
#region HELPERS
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]