You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2019/06/06 21:18:29 UTC
[ignite] branch master updated: IGNITE-9876 .NET: Implement Thin
Client best-effort affinity
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 5832217 IGNITE-9876 .NET: Implement Thin Client best-effort affinity
5832217 is described below
commit 58322178ff8b3b47f438e07e4a5e3760d7b30748
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Jun 7 00:18:17 2019 +0300
IGNITE-9876 .NET: Implement Thin Client best-effort affinity
Routes single-key cache operation requests to primary nodes,
saving potential extra network hop.
First iteration, considered an experimental feature:
* No reconnect to failed nodes
* No support for custom affinity keys
Disabled by default.
#6596
---
.../Apache.Ignite.Core.Tests.csproj | 4 +
.../Client/Cache/AffinityAwarenessTest.cs | 509 +++++++++++++++++++++
.../Client/Cache/ListLogger.cs | 66 +++
.../Client/Cache/TestKey.cs | 52 +++
.../Client/Cache/TestKeyWithAffinity.cs | 56 +++
.../Client/ClientConnectionTest.cs | 5 +-
.../Client/ClientTestBase.cs | 12 +-
.../Client/IgniteClientConfigurationTest.cs | 3 +-
.../Config/Client/IgniteClientConfiguration.xml | 38 +-
.../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 7 +
.../Cache/Affinity/AffinityTopologyVersion.cs | 78 +++-
.../Client/IgniteClientConfiguration.cs | 11 +
.../IgniteClientConfigurationSection.xsd | 5 +
.../Impl/Binary/BinaryHashCodeUtils.cs | 189 ++++++++
.../Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs | 18 +-
.../Impl/Client/Cache/CacheClient.cs | 172 +++++--
.../Cache/ClientCacheAffinityAwarenessGroup.cs | 110 +++++
.../Impl/Client/Cache/ClientCachePartitionMap.cs | 66 +++
.../Cache/ClientCacheTopologyPartitionMap.cs | 66 +++
.../Cache/ClientRendezvousAffinityFunction.cs | 50 ++
.../Impl/Client/ClientFailoverSocket.cs | 313 +++++++++++--
.../Jni/NativeMethod.cs => Client/ClientFlags.cs} | 33 +-
.../Apache.Ignite.Core/Impl/Client/ClientOp.cs | 1 +
.../Apache.Ignite.Core/Impl/Client/ClientSocket.cs | 81 +++-
.../Apache.Ignite.Core/Impl/Client/IgniteClient.cs | 8 +-
.../Impl/Client/SocketEndpoint.cs | 75 +++
.../Impl/Unmanaged/Jni/NativeMethod.cs | 1 +
27 files changed, 1880 insertions(+), 149 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 87042f6..26b28ff 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -132,6 +132,7 @@
<Compile Include="Cache\Query\Linq\CacheLinqTest.Contains.cs" />
<Compile Include="Cache\Store\CacheStoreSessionTestCodeConfig.cs" />
<Compile Include="Cache\Store\CacheStoreSessionTestSharedFactory.cs" />
+ <Compile Include="Client\Cache\AffinityAwarenessTest.cs" />
<Compile Include="Client\Cache\BinaryBuilderTest.cs" />
<Compile Include="Client\Cache\CacheClientAsyncWrapper.cs" />
<Compile Include="Client\Cache\CacheTest.cs" />
@@ -143,10 +144,13 @@
<Compile Include="Client\Cache\EmptyObject.cs" />
<Compile Include="Client\Cache\CreateCacheTest.cs" />
<Compile Include="Client\Cache\LinqTest.cs" />
+ <Compile Include="Client\Cache\ListLogger.cs" />
<Compile Include="Client\Cache\ScanQueryTest.cs" />
<Compile Include="Client\Cache\Person.cs" />
<Compile Include="Client\Cache\SqlQueryTest.cs" />
<Compile Include="Client\Cache\SqlQueryTestBase.cs" />
+ <Compile Include="Client\Cache\TestKey.cs" />
+ <Compile Include="Client\Cache\TestKeyWithAffinity.cs" />
<Compile Include="Client\ClientTestBase.cs" />
<Compile Include="Client\EndpointTest.cs" />
<Compile Include="Client\RawSecureSocketTest.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/AffinityAwarenessTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/AffinityAwarenessTest.cs
new file mode 100644
index 0000000..b800667
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/AffinityAwarenessTest.cs
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable RedundantCast
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.Linq;
+ using System.Net;
+ using System.Text.RegularExpressions;
+ using System.Threading.Tasks;
+ using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Client.Cache;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Events;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests affinity awareness functionality.
+ /// </summary>
+ public class AffinityAwarenessTest : ClientTestBase
+ {
+ /** */
+ private const string NodeIndexAttr = "test-node-idx";
+
+ /** */
+ private readonly List<ListLogger> _loggers = new List<ListLogger>();
+
+ /** */
+ private ICacheClient<int, int> _cache;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="AffinityAwarenessTest"/> class.
+ /// </summary>
+ public AffinityAwarenessTest() : base(3)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Fixture set up.
+ /// </summary>
+ public override void FixtureSetUp()
+ {
+ base.FixtureSetUp();
+
+ _cache = Client.CreateCache<int, int>("c");
+
+ // Warm up client partition data.
+ InitTestData();
+ _cache.Get(1);
+ _cache.Get(2);
+ }
+
+ public override void TestSetUp()
+ {
+ base.TestSetUp();
+
+ InitTestData();
+ ClearLoggers();
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ [TestCase(3, 0)]
+ [TestCase(4, 1)]
+ [TestCase(5, 1)]
+ [TestCase(6, 2)]
+ public void CacheGet_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
+ {
+ var res = _cache.Get(key);
+
+ Assert.AreEqual(key, res);
+ Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ [TestCase(3, 0)]
+ [TestCase(4, 1)]
+ [TestCase(5, 1)]
+ [TestCase(6, 2)]
+ public void CacheGetAsync_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
+ {
+ var res = _cache.GetAsync(key).Result;
+
+ Assert.AreEqual(key, res);
+ Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
+ }
+
+ [Test]
+ [TestCase(1, 0)]
+ [TestCase(2, 0)]
+ [TestCase(3, 0)]
+ [TestCase(4, 2)]
+ [TestCase(5, 2)]
+ [TestCase(6, 1)]
+ public void CacheGet_UserDefinedKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
+ {
+ var cache = Client.GetOrCreateCache<TestKey, int>("c_custom_key");
+ cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => new TestKey(x, x.ToString()), x => x));
+ cache.Get(new TestKey(1, "1")); // Warm up;
+
+ ClearLoggers();
+ var testKey = new TestKey(key, key.ToString());
+ var res = cache.Get(testKey);
+
+ Assert.AreEqual(key, res);
+ Assert.AreEqual(gridIdx, GetClientRequestGridIndex());
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(testKey));
+ }
+
+ [Test]
+ public void CachePut_UserDefinedTypeWithAffinityKey_ThrowsIgniteException()
+ {
+ // Note: annotation-based configuration is not supported on Java side.
+ // Use manual configuration instead.
+ var cacheClientConfiguration = new CacheClientConfiguration("c_custom_key_aff")
+ {
+ KeyConfiguration = new List<CacheKeyConfiguration>
+ {
+ new CacheKeyConfiguration(typeof(TestKeyWithAffinity))
+ {
+ AffinityKeyFieldName = "_i"
+ }
+ }
+ };
+ var cache = Client.GetOrCreateCache<TestKeyWithAffinity, int>(cacheClientConfiguration);
+
+ var ex = Assert.Throws<IgniteException>(() => cache.Put(new TestKeyWithAffinity(1, "1"), 1));
+
+ var expected = string.Format("Affinity keys are not supported. Object '{0}' has an affinity key.",
+ typeof(TestKeyWithAffinity));
+
+ Assert.AreEqual(expected, ex.Message);
+ }
+
+ [Test]
+ public void CacheGet_NewNodeEnteredTopology_RequestIsRoutedToDefaultNode()
+ {
+ // Warm-up.
+ Assert.AreEqual(1, _cache.Get(1));
+
+ // Before topology change.
+ Assert.AreEqual(12, _cache.Get(12));
+ Assert.AreEqual(1, GetClientRequestGridIndex());
+
+ Assert.AreEqual(14, _cache.Get(14));
+ Assert.AreEqual(2, GetClientRequestGridIndex());
+
+ // After topology change.
+ var cfg = GetIgniteConfiguration();
+ cfg.AutoGenerateIgniteInstanceName = true;
+
+ using (var ignite = Ignition.Start(cfg))
+ {
+ // Wait for rebalance.
+ var events = ignite.GetEvents();
+ events.EnableLocal(EventType.CacheRebalanceStopped);
+ events.WaitForLocal(EventType.CacheRebalanceStopped);
+
+ // Warm-up.
+ Assert.AreEqual(1, _cache.Get(1));
+
+ // Assert: keys 12 and 14 belong to a new node now, but we don't have the new node in the server list.
+ // Requests are routed to default node.
+ Assert.AreEqual(12, _cache.Get(12));
+ Assert.AreEqual(1, GetClientRequestGridIndex());
+
+ Assert.AreEqual(14, _cache.Get(14));
+ Assert.AreEqual(1, GetClientRequestGridIndex());
+ }
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ [TestCase(3, 0)]
+ [TestCase(4, 1)]
+ [TestCase(5, 1)]
+ [TestCase(6, 2)]
+ public void AllKeyBasedOperations_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key, int gridIdx)
+ {
+ int unused;
+
+ TestOperation(() => _cache.Get(key), gridIdx);
+ TestAsyncOperation(() => _cache.GetAsync(key), gridIdx);
+
+ TestOperation(() => _cache.TryGet(key, out unused), gridIdx);
+ TestAsyncOperation(() => _cache.TryGetAsync(key), gridIdx);
+
+ TestOperation(() => _cache.Put(key, key), gridIdx, "Put");
+ TestAsyncOperation(() => _cache.PutAsync(key, key), gridIdx, "Put");
+
+ TestOperation(() => _cache.PutIfAbsent(key, key), gridIdx, "PutIfAbsent");
+ TestAsyncOperation(() => _cache.PutIfAbsentAsync(key, key), gridIdx, "PutIfAbsent");
+
+ TestOperation(() => _cache.GetAndPutIfAbsent(key, key), gridIdx, "GetAndPutIfAbsent");
+ TestAsyncOperation(() => _cache.GetAndPutIfAbsentAsync(key, key), gridIdx, "GetAndPutIfAbsent");
+
+ TestOperation(() => _cache.Clear(key), gridIdx, "ClearKey");
+ TestAsyncOperation(() => _cache.ClearAsync(key), gridIdx, "ClearKey");
+
+ TestOperation(() => _cache.ContainsKey(key), gridIdx, "ContainsKey");
+ TestAsyncOperation(() => _cache.ContainsKeyAsync(key), gridIdx, "ContainsKey");
+
+ TestOperation(() => _cache.GetAndPut(key, key), gridIdx, "GetAndPut");
+ TestAsyncOperation(() => _cache.GetAndPutAsync(key, key), gridIdx, "GetAndPut");
+
+ TestOperation(() => _cache.GetAndReplace(key, key), gridIdx, "GetAndReplace");
+ TestAsyncOperation(() => _cache.GetAndReplaceAsync(key, key), gridIdx, "GetAndReplace");
+
+ TestOperation(() => _cache.GetAndRemove(key), gridIdx, "GetAndRemove");
+ TestAsyncOperation(() => _cache.GetAndRemoveAsync(key), gridIdx, "GetAndRemove");
+
+ TestOperation(() => _cache.Replace(key, key), gridIdx, "Replace");
+ TestAsyncOperation(() => _cache.ReplaceAsync(key, key), gridIdx, "Replace");
+
+ TestOperation(() => _cache.Replace(key, key, key + 1), gridIdx, "ReplaceIfEquals");
+ TestAsyncOperation(() => _cache.ReplaceAsync(key, key, key + 1), gridIdx, "ReplaceIfEquals");
+
+ TestOperation(() => _cache.Remove(key), gridIdx, "RemoveKey");
+ TestAsyncOperation(() => _cache.RemoveAsync(key), gridIdx, "RemoveKey");
+
+ TestOperation(() => _cache.Remove(key, key), gridIdx, "RemoveIfEquals");
+ TestAsyncOperation(() => _cache.RemoveAsync(key, key), gridIdx, "RemoveIfEquals");
+ }
+
+ [Test]
+ public void CacheGet_RepeatedCall_DoesNotRequestAffinityMapping()
+ {
+ // Test that affinity mapping is not requested when known.
+ // Start new cache to enforce partition mapping request.
+ Client.CreateCache<int, int>("repeat-call-test");
+ ClearLoggers();
+
+ _cache.Get(1);
+ _cache.Get(1);
+ _cache.Get(1);
+
+ var requests = GetCacheRequestNames(_loggers[1]).ToArray();
+
+ var expectedRequests = new[]
+ {
+ "Partitions",
+ "Get",
+ "Get",
+ "Get"
+ };
+
+ Assert.AreEqual(expectedRequests, requests);
+ }
+
+ [Test]
+ public void CacheGet_AffinityAwarenessDisabled_RequestIsRoutedToDefaultNode()
+ {
+ var cfg = GetClientConfiguration();
+ cfg.EnableAffinityAwareness = false;
+
+ using (var client = Ignition.StartClient(cfg))
+ {
+ var cache = client.GetCache<int, int>(_cache.Name);
+
+ var requestTargets = Enumerable
+ .Range(1, 10)
+ .Select(x =>
+ {
+ cache.Get(x);
+ return GetClientRequestGridIndex();
+ })
+ .Distinct()
+ .ToArray();
+
+ // Affinity awareness disabled - all requests go to same socket, picked with round-robin on connect.
+ Assert.AreEqual(1, requestTargets.Length);
+ }
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ [TestCase((uint) 1, 1)]
+ [TestCase((uint) 2, 0)]
+ [TestCase((byte) 1, 1)]
+ [TestCase((byte) 2, 0)]
+ [TestCase((sbyte) 1, 1)]
+ [TestCase((sbyte) 2, 0)]
+ [TestCase((short) 1, 1)]
+ [TestCase((short) 2, 0)]
+ [TestCase((ushort) 1, 1)]
+ [TestCase((ushort) 2, 0)]
+ [TestCase((long) 1, 1)]
+ [TestCase((long) 2, 0)]
+ [TestCase((ulong) 1, 1)]
+ [TestCase((ulong) 2, 0)]
+ [TestCase((float) 1.3, 0)]
+ [TestCase((float) 1.4, 2)]
+ [TestCase((double) 51.3, 1)]
+ [TestCase((double) 415.5, 0)]
+ [TestCase((double) 325.5, 0)]
+ [TestCase((double) 255.5, 1)]
+ [TestCase('1', 2)]
+ [TestCase('2', 1)]
+ [TestCase(true, 1)]
+ [TestCase(false, 1)]
+ public void CachePut_AllPrimitiveTypes_RequestIsRoutedToPrimaryNode(object key, int gridIdx)
+ {
+ var cache = Client.GetCache<object, object>(_cache.Name);
+ TestOperation(() => cache.Put(key, key), gridIdx, "Put");
+
+ // Verify against real Affinity.
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
+ }
+
+ [Test]
+ [TestCase("00000000-0000-0000-0000-000000000000", 0)]
+ [TestCase("0cb85a41-bd0d-405b-8f34-f515e8aabc39", 0)]
+ [TestCase("b4addd17-218c-4054-a5fa-03c88f5ee71c", 0)]
+ [TestCase("2611e3d2-618d-43b9-a318-2f5039f82568", 1)]
+ [TestCase("1dd8bfae-29b8-4949-aa99-7c9bfabe2566", 2)]
+ public void CachePut_GuidKey_RequestIsRoutedToPrimaryNode(string keyString, int gridIdx)
+ {
+ var key = Guid.Parse(keyString);
+
+ var cache = Client.GetCache<object, object>(_cache.Name);
+ TestOperation(() => cache.Put(key, key), gridIdx, "Put");
+
+ // Verify against real Affinity.
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
+ }
+
+ [Test]
+ [TestCase("2015-01-01T00:00:00.0000000Z", 2)]
+ [TestCase("2016-02-02T00:00:00.0000000Z", 2)]
+ [TestCase("2017-03-03T00:00:00.0000000Z", 0)]
+ public void CachePut_DateTimeKey_RequestIsRoutedToPrimaryNode(string keyString, int gridIdx)
+ {
+ var key = DateTime.Parse(keyString, CultureInfo.InvariantCulture).ToUniversalTime();
+
+ var cache = Client.GetCache<object, object>(_cache.Name);
+ TestOperation(() => cache.Put(key, key), gridIdx, "Put");
+
+ // Verify against real Affinity.
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ public void CachePut_IntPtrKeyKey_RequestIsRoutedToPrimaryNode(int keyInt, int gridIdx)
+ {
+ var key = new IntPtr(keyInt);
+
+ var cache = Client.GetCache<object, object>(_cache.Name);
+ TestOperation(() => cache.Put(key, key), gridIdx, "Put");
+
+ // Verify against real Affinity.
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
+ }
+
+ [Test]
+ [TestCase(1, 1)]
+ [TestCase(2, 0)]
+ public void CachePut_UIntPtrKeyKey_RequestIsRoutedToPrimaryNode(int keyInt, int gridIdx)
+ {
+ var key = new UIntPtr((uint) keyInt);
+
+ var cache = Client.GetCache<object, object>(_cache.Name);
+ TestOperation(() => cache.Put(key, key), gridIdx, "Put");
+
+ // Verify against real Affinity.
+ Assert.AreEqual(gridIdx, GetPrimaryNodeIdx(key));
+ }
+
+ protected override IgniteConfiguration GetIgniteConfiguration()
+ {
+ var cfg = base.GetIgniteConfiguration();
+
+ var index = _loggers.Count;
+ cfg.UserAttributes = new Dictionary<string, object> {{NodeIndexAttr, index}};
+
+ var logger = new ListLogger();
+ cfg.Logger = logger;
+ _loggers.Add(logger);
+
+ return cfg;
+ }
+
+ protected override IgniteClientConfiguration GetClientConfiguration()
+ {
+ var cfg = base.GetClientConfiguration();
+
+ cfg.EnableAffinityAwareness = true;
+ cfg.Endpoints.Add(string.Format("{0}:{1}", IPAddress.Loopback, IgniteClientConfiguration.DefaultPort + 1));
+ cfg.Endpoints.Add(string.Format("{0}:{1}", IPAddress.Loopback, IgniteClientConfiguration.DefaultPort + 2));
+
+ return cfg;
+ }
+
+ private int GetClientRequestGridIndex(string message = null)
+ {
+ message = message ?? "Get";
+
+ try
+ {
+ for (var i = 0; i < _loggers.Count; i++)
+ {
+ var requests = GetCacheRequestNames(_loggers[i]);
+
+ if (requests.Contains(message))
+ {
+ return i;
+ }
+ }
+
+ return -1;
+ }
+ finally
+ {
+ ClearLoggers();
+ }
+ }
+
+ private static IEnumerable<string> GetCacheRequestNames(ListLogger logger)
+ {
+ var messageRegex = new Regex(
+ @"Client request received \[reqId=\d+, addr=/127.0.0.1:\d+, " +
+ @"req=org.apache.ignite.internal.processors.platform.client.cache.ClientCache(\w+)Request@");
+
+ return logger.Messages
+ .Select(m => messageRegex.Match(m))
+ .Where(m => m.Success)
+ .Select(m => m.Groups[1].Value);
+ }
+
+
+ private void ClearLoggers()
+ {
+ foreach (var logger in _loggers)
+ {
+ logger.Clear();
+ }
+ }
+
+ private void TestOperation(Action action, int expectedGridIdx, string message = null)
+ {
+ InitTestData();
+ ClearLoggers();
+ action();
+ Assert.AreEqual(expectedGridIdx, GetClientRequestGridIndex(message));
+ }
+
+ private void TestAsyncOperation<T>(Func<T> action, int expectedGridIdx, string message = null)
+ where T : Task
+ {
+ ClearLoggers();
+ action().Wait();
+ Assert.AreEqual(expectedGridIdx, GetClientRequestGridIndex(message));
+ }
+
+ private void InitTestData()
+ {
+ _cache.PutAll(Enumerable.Range(1, 100).ToDictionary(x => x, x => x));
+ }
+
+ private int GetPrimaryNodeIdx<T>(T key)
+ {
+ var idx = 0;
+
+ // GetAll is not ordered - sort the same way as _loggers.
+ var ignites = Ignition.GetAll()
+ .OrderBy(n => n.GetCluster().GetLocalNode().GetAttribute<int>(NodeIndexAttr));
+
+ foreach (var ignite in ignites)
+ {
+ var aff = ignite.GetAffinity(_cache.Name);
+ var localNode = ignite.GetCluster().GetLocalNode();
+
+ if (aff.IsPrimary(localNode, key))
+ {
+ return idx;
+ }
+
+ idx++;
+ }
+
+ throw new InvalidOperationException("Can't determine primary node");
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ListLogger.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ListLogger.cs
new file mode 100644
index 0000000..849458e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/ListLogger.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Apache.Ignite.Core.Log;
+
+ public class ListLogger : ILogger
+ {
+ /** */
+ private readonly List<string> _messages = new List<string>();
+
+ /** */
+ private readonly object _lock = new object();
+
+ public List<string> Messages
+ {
+ get
+ {
+ lock (_lock)
+ {
+ return _messages.ToList();
+ }
+ }
+ }
+
+ public void Clear()
+ {
+ lock (_lock)
+ {
+ _messages.Clear();
+ }
+ }
+
+ public void Log(LogLevel level, string message, object[] args, IFormatProvider formatProvider, string category,
+ string nativeErrorInfo, Exception ex)
+ {
+ lock (_lock)
+ {
+ _messages.Add(message);
+ }
+ }
+
+ public bool IsEnabled(LogLevel level)
+ {
+ return level == LogLevel.Debug;
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKey.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKey.cs
new file mode 100644
index 0000000..242969b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKey.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ public sealed class TestKey
+ {
+ private readonly int _i;
+ private readonly string _s;
+
+ public TestKey(int i, string s)
+ {
+ _i = i;
+ _s = s;
+ }
+
+ private bool Equals(TestKey other)
+ {
+ return _i == other._i && string.Equals(_s, other._s);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != GetType()) return false;
+ return Equals((TestKey) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (_i * 397) ^ (_s != null ? _s.GetHashCode() : 0);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKeyWithAffinity.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKeyWithAffinity.cs
new file mode 100644
index 0000000..785c0ab
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/TestKeyWithAffinity.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Client.Cache
+{
+ using Apache.Ignite.Core.Cache.Affinity;
+
+ public sealed class TestKeyWithAffinity
+ {
+ [AffinityKeyMapped]
+ private readonly int _i;
+
+ private readonly string _s;
+
+ public TestKeyWithAffinity(int i, string s)
+ {
+ _i = i;
+ _s = s;
+ }
+
+ private bool Equals(TestKeyWithAffinity other)
+ {
+ return _i == other._i && string.Equals(_s, other._s);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != GetType()) return false;
+ return Equals((TestKeyWithAffinity) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (_i * 397) ^ (_s != null ? _s.GetHashCode() : 0);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
index 0c202bc..a882176 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientConnectionTest.cs
@@ -292,7 +292,6 @@ namespace Apache.Ignite.Core.Tests.Client
ClientConnectorConfiguration.DefaultPort,
AddressFamily.InterNetwork),
null,
- null,
new ClientProtocolVersion(-1, -1, -1)));
Assert.AreEqual(ClientStatusCode.Fail, ex.StatusCode);
@@ -532,9 +531,7 @@ namespace Apache.Ignite.Core.Tests.Client
Assert.IsNotNull(GetSocketException(ex));
// Second request causes reconnect attempt which fails (server is stopped).
- var aex = Assert.Throws<AggregateException>(() => client.GetCacheNames());
- Assert.AreEqual("Failed to establish Ignite thin client connection, " +
- "examine inner exceptions for details.", aex.Message.Substring(0, 88));
+ Assert.Catch(() => client.GetCacheNames());
// Start server, next operation succeeds.
Ignition.Start(TestUtils.GetTestConfiguration());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
index c6aee32..9e9db43 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/ClientTestBase.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Core.Tests.Client
{
using System;
+ using System.Collections.Generic;
using System.Linq;
using System.Net;
using Apache.Ignite.Core.Binary;
@@ -55,18 +56,19 @@ namespace Apache.Ignite.Core.Tests.Client
}
/// <summary>
- /// Fixture tear down.
+ /// Fixture set up.
/// </summary>
[TestFixtureSetUp]
- public void FixtureSetUp()
+ public virtual void FixtureSetUp()
{
var cfg = GetIgniteConfiguration();
Ignition.Start(cfg);
- cfg.AutoGenerateIgniteInstanceName = true;
-
for (var i = 1; i < _gridCount; i++)
{
+ cfg = GetIgniteConfiguration();
+ cfg.AutoGenerateIgniteInstanceName = true;
+
Ignition.Start(cfg);
}
@@ -140,7 +142,7 @@ namespace Apache.Ignite.Core.Tests.Client
{
return new IgniteClientConfiguration
{
- Endpoints = new[] {IPAddress.Loopback.ToString()}
+ Endpoints = new List<string> { IPAddress.Loopback.ToString() }
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
index a113193..b9987bc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/IgniteClientConfigurationTest.cs
@@ -91,7 +91,8 @@ namespace Apache.Ignite.Core.Tests.Client
"foo",
"bar:123",
"baz:100..103"
- }
+ },
+ EnableAffinityAwareness = true
};
using (var xmlReader = XmlReader.Create(Path.Combine("Config", "Client", "IgniteClientConfiguration.xml")))
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
index 291fcaf..29db47c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Client/IgniteClientConfiguration.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
@@ -18,24 +18,24 @@
-->
<igniteClientConfiguration host="test1" port="345" socketReceiveBufferSize="222" socketSendBufferSize="333"
- tcpNoDelay="false" socketTimeout="0:0:15">
- <binaryConfiguration compactFooter="false" keepDeserialized="false">
- <types>
- <string>foo</string>
- <string>bar</string>
- </types>
- </binaryConfiguration>
+ tcpNoDelay="false" socketTimeout="0:0:15" enableAffinityAwareness="true">
+ <binaryConfiguration compactFooter="false" keepDeserialized="false">
+ <types>
+ <string>foo</string>
+ <string>bar</string>
+ </types>
+ </binaryConfiguration>
- <sslStreamFactory type="Apache.Ignite.Core.Client.SslStreamFactory"
- certificatePath="abc.pfx"
- certificatePassword="foo"
- checkCertificateRevocation="true"
- skipServerCertificateValidation="true"
- sslProtocols="None" />
+ <sslStreamFactory type="Apache.Ignite.Core.Client.SslStreamFactory"
+ certificatePath="abc.pfx"
+ certificatePassword="foo"
+ checkCertificateRevocation="true"
+ skipServerCertificateValidation="true"
+ sslProtocols="None" />
- <endpoints>
- <string>foo</string>
- <string>bar:123</string>
- <string>baz:100..103</string>
- </endpoints>
+ <endpoints>
+ <string>foo</string>
+ <string>bar:123</string>
+ <string>baz:100..103</string>
+ </endpoints>
</igniteClientConfiguration>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index b99f2a8..303d6a6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -79,8 +79,15 @@
<Compile Include="Failure\NoOpFailureHandler.cs" />
<Compile Include="Failure\StopNodeFailureHandler.cs" />
<Compile Include="Failure\StopNodeOrHaltFailureHandler.cs" />
+ <Compile Include="Impl\Binary\BinaryHashCodeUtils.cs" />
<Compile Include="Impl\Cache\QueryMetricsImpl.cs" />
+ <Compile Include="Impl\Client\Cache\ClientCacheAffinityAwarenessGroup.cs" />
+ <Compile Include="Impl\Client\Cache\ClientCachePartitionMap.cs" />
+ <Compile Include="Impl\Client\Cache\ClientCacheTopologyPartitionMap.cs" />
+ <Compile Include="Impl\Client\Cache\ClientRendezvousAffinityFunction.cs" />
+ <Compile Include="Impl\Client\ClientFlags.cs" />
<Compile Include="Impl\Client\Endpoint.cs" />
+ <Compile Include="Impl\Client\SocketEndpoint.cs" />
<Compile Include="Impl\Common\TaskRunner.cs" />
<Compile Include="Impl\Transactions\TransactionCollectionImpl.cs" />
<Compile Include="Ssl\ISslContextFactory.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
index 9bfdfb4..1f9436a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs
@@ -23,7 +23,7 @@ namespace Apache.Ignite.Core.Cache.Affinity
/// <summary>
/// Affinity topology version.
/// </summary>
- public struct AffinityTopologyVersion : IEquatable<AffinityTopologyVersion>
+ public struct AffinityTopologyVersion : IEquatable<AffinityTopologyVersion>, IComparable<AffinityTopologyVersion>
{
/** */
private readonly long _version;
@@ -101,8 +101,8 @@ namespace Apache.Ignite.Core.Cache.Affinity
/// <summary>
/// Implements the operator ==.
/// </summary>
- /// <param name="left">The left.</param>
- /// <param name="right">The right.</param>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
/// <returns>
/// The result of the operator.
/// </returns>
@@ -114,8 +114,8 @@ namespace Apache.Ignite.Core.Cache.Affinity
/// <summary>
/// Implements the operator !=.
/// </summary>
- /// <param name="left">The left.</param>
- /// <param name="right">The right.</param>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
/// <returns>
/// The result of the operator.
/// </returns>
@@ -125,6 +125,58 @@ namespace Apache.Ignite.Core.Cache.Affinity
}
/// <summary>
+ /// Implements the operator 'less than'.
+ /// </summary>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator <(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return left.CompareTo(right) < 0;
+ }
+
+ /// <summary>
+ /// Implements the operator 'greater than'.
+ /// </summary>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator >(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return left.CompareTo(right) > 0;
+ }
+
+ /// <summary>
+ /// Implements the operator 'less or equal than'.
+ /// </summary>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator <=(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return left.CompareTo(right) <= 0;
+ }
+
+ /// <summary>
+ /// Implements the operator 'greater or equal than'.
+ /// </summary>
+ /// <param name="left">The left operand.</param>
+ /// <param name="right">The right operand.</param>
+ /// <returns>
+ /// The result of the operator.
+ /// </returns>
+ public static bool operator >=(AffinityTopologyVersion left, AffinityTopologyVersion right)
+ {
+ return left.CompareTo(right) >= 0;
+ }
+
+ /// <summary>
/// Returns a <see cref="string" /> that represents this instance.
/// </summary>
/// <returns>
@@ -134,5 +186,19 @@ namespace Apache.Ignite.Core.Cache.Affinity
{
return string.Format("AffinityTopologyVersion [Version={0}, MinorVersion={1}]", _version, _minorVersion);
}
+
+ /// <summary>
+ /// Compares the current instance with another object of the same type and returns an integer that indicates
+ /// whether the current instance precedes, follows,
+ /// or occurs in the same position in the sort order as the other object.
+ /// </summary>
+ public int CompareTo(AffinityTopologyVersion other)
+ {
+ var versionComparison = _version.CompareTo(other._version);
+
+ return versionComparison != 0
+ ? versionComparison
+ : _minorVersion.CompareTo(other._minorVersion);
+ }
}
-}
\ No newline at end of file
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
index 311dfbe..222d1d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Client/IgniteClientConfiguration.cs
@@ -113,6 +113,7 @@ namespace Apache.Ignite.Core.Client
Password = cfg.Password;
Endpoints = cfg.Endpoints == null ? null : cfg.Endpoints.ToList();
ReconnectDisabled = cfg.ReconnectDisabled;
+ EnableAffinityAwareness = cfg.EnableAffinityAwareness;
}
/// <summary>
@@ -203,6 +204,16 @@ namespace Apache.Ignite.Core.Client
public string Password { get; set; }
/// <summary>
+ /// Gets or sets a value indicating whether affinity awareness should be enabled.
+ /// <para />
+ /// Default is false: only one connection is established at a given moment to a random server node.
+ /// When true: for cache operations, Ignite client attempts to send the request directly to
+ /// the primary node for the given cache key.
+ /// To do so, connection is established to every known server node at all times.
+ /// </summary>
+ public bool EnableAffinityAwareness { get; set; }
+
+ /// <summary>
/// Gets or sets custom binary processor. Internal property for tests.
/// </summary>
internal IBinaryProcessor BinaryProcessor { get; set; }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
index d7ebc12..a47eb98 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteClientConfigurationSection.xsd
@@ -252,6 +252,11 @@
<xs:documentation>Disables automatic reconnect on network or server failure.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="enableAffinityAwareness" type="xs:boolean">
+ <xs:annotation>
+ <xs:documentation>Enables affinity-aware connection: client will establish connection to every known server and route requests to primary nodes for cache operations.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
<xs:attribute name="userName" type="xs:string">
<xs:annotation>
<xs:documentation>Username to be used to connect to secured cluster.</xs:documentation>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHashCodeUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHashCodeUtils.cs
new file mode 100644
index 0000000..de09fdd
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryHashCodeUtils.cs
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Binary
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Common;
+
+ /// <summary>
+ /// Utilities for binary hash codes.
+ /// </summary>
+ internal static class BinaryHashCodeUtils
+ {
+ /// <summary>
+ /// Gets the Ignite-specific hash code for the provided value.
+ /// </summary>
+ public static unsafe int GetHashCode<T>(T val, Marshaller marsh, IDictionary<int, int> affinityKeyFieldIds)
+ {
+ Debug.Assert(marsh != null);
+ Debug.Assert(val != null);
+
+ var type = val.GetType();
+
+ if (type == typeof(int))
+ return TypeCaster<int>.Cast(val);
+
+ if (type == typeof(long))
+ return GetLongHashCode(TypeCaster<long>.Cast(val));
+
+ if (type == typeof(bool))
+ return TypeCaster<bool>.Cast(val) ? 1231 : 1237;
+
+ if (type == typeof(byte))
+ return TypeCaster<byte>.Cast(val);
+
+ if (type == typeof(short))
+ return TypeCaster<short>.Cast(val);
+
+ if (type == typeof(char))
+ return TypeCaster<char>.Cast(val);
+
+ if (type == typeof(float))
+ {
+ var floatVal = TypeCaster<float>.Cast(val);
+ return *(int*) &floatVal;
+ }
+
+ if (type == typeof(double))
+ {
+ var doubleVal = TypeCaster<double>.Cast(val);
+ return GetLongHashCode(*(long*) &doubleVal);
+ }
+
+ if (type == typeof(sbyte))
+ {
+ var val0 = TypeCaster<sbyte>.Cast(val);
+ return *(byte*) &val0;
+ }
+
+ if (type == typeof(ushort))
+ {
+ var val0 = TypeCaster<ushort>.Cast(val);
+ return *(short*) &val0;
+ }
+
+ if (type == typeof(uint))
+ {
+ var val0 = TypeCaster<uint>.Cast(val);
+ return *(int*) &val0;
+ }
+
+ if (type == typeof(ulong))
+ {
+ var val0 = TypeCaster<ulong>.Cast(val);
+ return GetLongHashCode(*(long*) &val0);
+ }
+
+ if (type == typeof(IntPtr))
+ {
+ var val0 = TypeCaster<IntPtr>.Cast(val).ToInt64();
+ return GetLongHashCode(val0);
+ }
+
+ if (type == typeof(UIntPtr))
+ {
+ var val0 = TypeCaster<UIntPtr>.Cast(val).ToUInt64();
+ return GetLongHashCode(*(long*) &val0);
+ }
+
+ if (type == typeof(Guid))
+ {
+ return GetGuidHashCode(TypeCaster<Guid>.Cast(val));
+ }
+
+ // DateTime, when used as key, is always written as BinaryObject.
+ return GetComplexTypeHashCode(val, marsh, affinityKeyFieldIds);
+ }
+
+ private static int GetComplexTypeHashCode<T>(T val, Marshaller marsh, IDictionary<int, int> affinityKeyFieldIds)
+ {
+ using (var stream = new BinaryHeapStream(128))
+ {
+ var writer = marsh.StartMarshal(stream);
+
+ int? hashCode = null;
+
+ writer.OnObjectWritten += (header, obj) =>
+ {
+ if (affinityKeyFieldIds != null && affinityKeyFieldIds.ContainsKey(header.TypeId))
+ {
+ var err = string.Format(
+ "Affinity keys are not supported. Object '{0}' has an affinity key.", obj);
+
+ throw new IgniteException(err);
+ }
+
+ // In case of composite objects we need the last hash code.
+ hashCode = header.HashCode;
+ };
+
+ writer.Write(val);
+
+ if (hashCode != null)
+ {
+ // ReSharper disable once PossibleInvalidOperationException (false detection).
+ return hashCode.Value;
+ }
+
+ throw new IgniteException(string.Format("Failed to compute hash code for object '{0}'", val));
+ }
+ }
+
+ private static int GetLongHashCode(long longVal)
+ {
+ return (int) (longVal ^ ((longVal >> 32) & 0xFFFFFFFF));
+ }
+
+ private static unsafe int GetGuidHashCode(Guid val)
+ {
+ var bytes = val.ToByteArray();
+ byte* jBytes = stackalloc byte[16];
+
+ jBytes[0] = bytes[6]; // c1
+ jBytes[1] = bytes[7]; // c2
+
+ jBytes[2] = bytes[4]; // b1
+ jBytes[3] = bytes[5]; // b2
+
+ jBytes[4] = bytes[0]; // a1
+ jBytes[5] = bytes[1]; // a2
+ jBytes[6] = bytes[2]; // a3
+ jBytes[7] = bytes[3]; // a4
+
+ jBytes[8] = bytes[15]; // k
+ jBytes[9] = bytes[14]; // j
+ jBytes[10] = bytes[13]; // i
+ jBytes[11] = bytes[12]; // h
+ jBytes[12] = bytes[11]; // g
+ jBytes[13] = bytes[10]; // f
+ jBytes[14] = bytes[9]; // e
+ jBytes[15] = bytes[8]; // d
+
+ var hi = *(long*) &jBytes[0];
+ var lo = *(long*) &jBytes[8];
+
+ var hilo = hi ^ lo;
+
+ return (int) (hilo ^ ((hilo >> 32) & 0xFFFFFFFF));
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
index 55668c4..4abac0a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriter.cs
@@ -69,6 +69,11 @@ namespace Apache.Ignite.Core.Impl.Binary
}
/// <summary>
+ /// Invoked when binary object writing finishes.
+ /// </summary>
+ internal event Action<BinaryObjectHeader, object> OnObjectWritten;
+
+ /// <summary>
/// Write named boolean value.
/// </summary>
/// <param name="fieldName">Field name.</param>
@@ -1261,14 +1266,19 @@ namespace Apache.Ignite.Core.Impl.Binary
var len = _stream.Position - pos;
- var hashCode = BinaryArrayEqualityComparer.GetHashCode(Stream, pos + BinaryObjectHeader.Size,
- dataEnd - pos - BinaryObjectHeader.Size);
+ var hashCode = BinaryArrayEqualityComparer.GetHashCode(Stream, pos + BinaryObjectHeader.Size,
+ dataEnd - pos - BinaryObjectHeader.Size);
- var header = new BinaryObjectHeader(desc.IsRegistered ? desc.TypeId : BinaryTypeId.Unregistered,
- hashCode, len, schemaId, schemaOffset, flags);
+ var header = new BinaryObjectHeader(desc.IsRegistered ? desc.TypeId : BinaryTypeId.Unregistered,
+ hashCode, len, schemaId, schemaOffset, flags);
BinaryObjectHeader.Write(header, _stream, pos);
+ if (OnObjectWritten != null)
+ {
+ OnObjectWritten(header, obj);
+ }
+
Stream.Seek(pos + len, SeekOrigin.Begin); // Seek to the end
}
finally
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
index 99b4f34..c50e031 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/CacheClient.cs
@@ -96,7 +96,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOp(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalNotNull<TV>);
}
/** <inheritDoc /> */
@@ -104,7 +104,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalNotNull<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key), UnmarshalNotNull<TV>);
}
/** <inheritDoc /> */
@@ -112,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- var res = DoOutInOp(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalCacheResult<TV>);
+ var res = DoOutInOpAffinity(ClientOp.CacheGet, key, UnmarshalCacheResult<TV>);
value = res.Value;
@@ -124,7 +124,8 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAsync(ClientOp.CacheGet, w => w.WriteObject(key), UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGet, key, w => w.WriteObjectDetached(key),
+ UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -149,7 +150,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- DoOutOp(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
+ DoOutOpAffinity(ClientOp.CachePut, key, val);
}
/** <inheritDoc /> */
@@ -158,7 +159,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutOpAsync(ClientOp.CachePut, w => WriteKeyVal(w, key, val));
+ return DoOutOpAffinityAsync(ClientOp.CachePut, key, w => WriteKeyVal(w, key, val));
}
/** <inheritDoc /> */
@@ -166,7 +167,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOp(ClientOp.CacheContainsKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheContainsKey, key, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -174,7 +175,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAsync(ClientOp.CacheContainsKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheContainsKey, key, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -243,7 +244,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGetAndPut, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -252,7 +253,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CacheGetAndPut, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGetAndPut, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -261,7 +262,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGetAndReplace, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -270,7 +271,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CacheGetAndReplace, w => WriteKeyVal(w, key, val), UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGetAndReplace, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -278,8 +279,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOp(ClientOp.CacheGetAndRemove, w => w.WriteObjectDetached(key),
- UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGetAndRemove, key, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -287,8 +287,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAsync(ClientOp.CacheGetAndRemove, w => w.WriteObjectDetached(key),
- UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGetAndRemove, key, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -297,7 +296,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool());
}
/** <inheritDoc /> */
@@ -306,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CachePutIfAbsent, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CachePutIfAbsent, key, val, s => s.ReadBool());
}
/** <inheritDoc /> */
@@ -315,8 +314,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
- UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinity(ClientOp.CacheGetAndPutIfAbsent, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -325,8 +323,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CacheGetAndPutIfAbsent, w => WriteKeyVal(w, key, val),
- UnmarshalCacheResult<TV>);
+ return DoOutInOpAffinityAsync(ClientOp.CacheGetAndPutIfAbsent, key, val, UnmarshalCacheResult<TV>);
}
/** <inheritDoc /> */
@@ -335,7 +332,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheReplace, key, val, s => s.ReadBool());
}
/** <inheritDoc /> */
@@ -344,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CacheReplace, w => WriteKeyVal(w, key, val), s => s.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheReplace, key, val, s => s.ReadBool());
}
/** <inheritDoc /> */
@@ -354,7 +351,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutInOp(ClientOp.CacheReplaceIfEquals, w =>
+ return DoOutInOpAffinity(ClientOp.CacheReplaceIfEquals, key, w =>
{
w.WriteObjectDetached(key);
w.WriteObjectDetached(oldVal);
@@ -369,7 +366,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(oldVal, "oldVal");
IgniteArgumentCheck.NotNull(newVal, "newVal");
- return DoOutInOpAsync(ClientOp.CacheReplaceIfEquals, w =>
+ return DoOutInOpAffinityAsync(ClientOp.CacheReplaceIfEquals, key, w =>
{
w.WriteObjectDetached(key);
w.WriteObjectDetached(oldVal);
@@ -410,7 +407,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- DoOutOp(ClientOp.CacheClearKey, w => w.WriteObjectDetached(key));
+ DoOutOpAffinity(ClientOp.CacheClearKey, key);
}
/** <inheritDoc /> */
@@ -418,7 +415,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutOpAsync(ClientOp.CacheClearKey, w => w.WriteObjectDetached(key));
+ return DoOutOpAffinityAsync(ClientOp.CacheClearKey, key, w => w.WriteObjectDetached(key));
}
/** <inheritDoc /> */
@@ -442,7 +439,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOp(ClientOp.CacheRemoveKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheRemoveKey, key, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -450,7 +447,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
{
IgniteArgumentCheck.NotNull(key, "key");
- return DoOutInOpAsync(ClientOp.CacheRemoveKey, w => w.WriteObjectDetached(key), r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheRemoveKey, key, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -459,7 +456,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOp(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
+ return DoOutInOpAffinity(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -468,7 +465,7 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
IgniteArgumentCheck.NotNull(key, "key");
IgniteArgumentCheck.NotNull(val, "val");
- return DoOutInOpAsync(ClientOp.CacheRemoveIfEquals, w => WriteKeyVal(w, key, val), r => r.ReadBool());
+ return DoOutInOpAffinityAsync(ClientOp.CacheRemoveIfEquals, key, val, r => r.ReadBool());
}
/** <inheritDoc /> */
@@ -562,7 +559,23 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/// <summary>
- /// Does the out op.
+ /// Does the out op with affinity awareness.
+ /// </summary>
+ private void DoOutOpAffinity(ClientOp opId, TK key)
+ {
+ DoOutInOpAffinity<object>(opId, key, null);
+ }
+
+ /// <summary>
+ /// Does the out op with affinity awareness.
+ /// </summary>
+ private void DoOutOpAffinity(ClientOp opId, TK key, TV val)
+ {
+ DoOutInOpAffinity<object>(opId, key, val, null);
+ }
+
+ /// <summary>
+ /// Does the out op with affinity awareness.
/// </summary>
private Task DoOutOpAsync(ClientOp opId, Action<BinaryWriter> writeAction = null)
{
@@ -570,6 +583,14 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/// <summary>
+ /// Does the out op with affinity awareness.
+ /// </summary>
+ private Task DoOutOpAffinityAsync(ClientOp opId, TK key, Action<BinaryWriter> writeAction = null)
+ {
+ return DoOutInOpAffinityAsync<object>(opId, key, writeAction, null);
+ }
+
+ /// <summary>
/// Does the out in op.
/// </summary>
private T DoOutInOp<T>(ClientOp opId, Action<BinaryWriter> writeAction,
@@ -580,6 +601,53 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinity(
+ opId,
+ stream => WriteRequest(w => w.WriteObjectDetached(key), stream),
+ readFunc,
+ _id,
+ key,
+ HandleError<T>);
+ }
+
+ /// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction,
+ Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinity(
+ opId,
+ stream => WriteRequest(writeAction, stream),
+ readFunc,
+ _id,
+ key,
+ HandleError<T>);
+ }
+
+ /// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private T DoOutInOpAffinity<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinity(
+ opId,
+ stream => WriteRequest(w =>
+ {
+ w.WriteObjectDetached(key);
+ w.WriteObjectDetached(val);
+ }, stream),
+ readFunc,
+ _id,
+ key,
+ HandleError<T>);
+ }
+
+ /// <summary>
/// Does the out in op.
/// </summary>
private Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<BinaryWriter> writeAction,
@@ -590,6 +658,44 @@ namespace Apache.Ignite.Core.Impl.Client.Cache
}
/// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Action<BinaryWriter> writeAction,
+ Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinityAsync(opId, stream => WriteRequest(writeAction, stream),
+ readFunc, _id, key, HandleError<T>);
+ }
+
+ /// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, TV val, Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinityAsync(
+ opId,
+ stream => WriteRequest(w =>
+ {
+ w.WriteObjectDetached(key);
+ w.WriteObjectDetached(val);
+ }, stream),
+ readFunc,
+ _id,
+ key,
+ HandleError<T>);
+ }
+
+ /// <summary>
+ /// Does the out in op with affinity awareness.
+ /// </summary>
+ private Task<T> DoOutInOpAffinityAsync<T>(ClientOp opId, TK key, Func<IBinaryStream, T> readFunc)
+ {
+ return _ignite.Socket.DoOutInOpAffinityAsync(opId,
+ stream => WriteRequest(w => w.WriteObjectDetached(key), stream),
+ readFunc, _id, key, HandleError<T>);
+ }
+
+ /// <summary>
/// Writes the request.
/// </summary>
private void WriteRequest(Action<BinaryWriter> writeAction, IBinaryStream stream)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheAffinityAwarenessGroup.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheAffinityAwarenessGroup.cs
new file mode 100644
index 0000000..f245f9c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheAffinityAwarenessGroup.cs
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Impl.Binary;
+ using Apache.Ignite.Core.Impl.Binary.IO;
+
+ /// <summary>
+ /// Partition mapping associated with the group of caches.
+ /// Mirrors corresponding Java class.
+ /// </summary>
+ internal class ClientCacheAffinityAwarenessGroup
+ {
+ /** */
+ private readonly List<KeyValuePair<Guid, List<int>>> _partitionMap;
+
+ /** */
+ private readonly List<KeyValuePair<int, Dictionary<int, int>>> _caches;
+
+ public ClientCacheAffinityAwarenessGroup(IBinaryStream stream)
+ {
+ // Whether this group is eligible for client-side partition awareness.
+ var applicable = stream.ReadBool();
+
+ var cachesCount = stream.ReadInt();
+ _caches = new List<KeyValuePair<int, Dictionary<int, int>>>(cachesCount);
+
+ for (var i = 0; i < cachesCount; i++)
+ {
+ var cacheId = stream.ReadInt();
+ if (!applicable)
+ {
+ _caches.Add(new KeyValuePair<int, Dictionary<int, int>>(cacheId, null));
+ continue;
+ }
+
+ var keyCfgCount = stream.ReadInt();
+ Dictionary<int, int> keyCfgs = null;
+ if (keyCfgCount > 0)
+ {
+ keyCfgs = new Dictionary<int, int>(keyCfgCount);
+ for (var j = 0; j < keyCfgCount; j++)
+ {
+ keyCfgs[stream.ReadInt()] = stream.ReadInt();
+ }
+ }
+
+ _caches.Add(new KeyValuePair<int, Dictionary<int, int>>(cacheId, keyCfgs));
+ }
+
+ if (!applicable)
+ return;
+
+ var partMapSize = stream.ReadInt();
+ _partitionMap = new List<KeyValuePair<Guid, List<int>>>(partMapSize);
+
+ var reader = BinaryUtils.Marshaller.StartUnmarshal(stream);
+
+ for (var i = 0; i < partMapSize; i++)
+ {
+ var nodeId = reader.ReadGuid();
+ Debug.Assert(nodeId != null);
+
+ var partCount = stream.ReadInt();
+ var parts = new List<int>(partCount);
+
+ for (int j = 0; j < partCount; j++)
+ {
+ parts.Add(stream.ReadInt());
+ }
+
+ _partitionMap.Add(new KeyValuePair<Guid, List<int>>(nodeId.Value, parts));
+ }
+ }
+
+ /// <summary>
+ /// Gets the caches.
+ /// </summary>
+ public ICollection<KeyValuePair<int, Dictionary<int, int>>> Caches
+ {
+ get { return _caches; }
+ }
+
+ /// <summary>
+ /// Gets the partition map: node id -> partitions.
+ /// </summary>
+ public ICollection<KeyValuePair<Guid, List<int>>> PartitionMap
+ {
+ get { return _partitionMap; }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCachePartitionMap.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCachePartitionMap.cs
new file mode 100644
index 0000000..7c478d4
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCachePartitionMap.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Cache
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+
+ /// <summary>
+ /// Partition map for a cache.
+ /// </summary>
+ internal class ClientCachePartitionMap
+ {
+ /** Cache id. */
+ private readonly int _cacheId;
+
+ /** Array of node id per partition. */
+ private readonly IList<Guid> _partitionNodeIds;
+
+ /** Key configuration. */
+ private readonly IDictionary<int, int> _keyConfiguration;
+
+ public ClientCachePartitionMap(int cacheId, IList<Guid> partitionNodeIds,
+ IDictionary<int, int> keyConfiguration)
+ {
+ Debug.Assert(partitionNodeIds != null && partitionNodeIds.Count > 0);
+
+ _cacheId = cacheId;
+ _keyConfiguration = keyConfiguration;
+ _partitionNodeIds = partitionNodeIds;
+ }
+
+ public int CacheId
+ {
+ get { return _cacheId; }
+ }
+
+ /// <summary>
+ /// Key configuration: map from key type id to affinity key field id.
+ /// </summary>
+ public IDictionary<int, int> KeyConfiguration
+ {
+ get { return _keyConfiguration; }
+ }
+
+ public IList<Guid> PartitionNodeIds
+ {
+ get { return _partitionNodeIds; }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheTopologyPartitionMap.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheTopologyPartitionMap.cs
new file mode 100644
index 0000000..dfd6851
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientCacheTopologyPartitionMap.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Cache
+{
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using Apache.Ignite.Core.Cache.Affinity;
+
+ /// <summary>
+ /// Partition maps for specific topology version.
+ /// </summary>
+ internal class ClientCacheTopologyPartitionMap
+ {
+ /** */
+ private readonly Dictionary<int, ClientCachePartitionMap> _cachePartitionMap;
+
+ /** */
+ private readonly AffinityTopologyVersion _affinityTopologyVersion;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="ClientCacheTopologyPartitionMap"/> class.
+ /// </summary>
+ /// <param name="cachePartitionMap">Partition map.</param>
+ /// <param name="affinityTopologyVersion">Topology version.</param>
+ public ClientCacheTopologyPartitionMap(
+ Dictionary<int, ClientCachePartitionMap> cachePartitionMap,
+ AffinityTopologyVersion affinityTopologyVersion)
+ {
+ Debug.Assert(cachePartitionMap != null);
+
+ _cachePartitionMap = cachePartitionMap;
+ _affinityTopologyVersion = affinityTopologyVersion;
+ }
+
+ /// <summary>
+ /// Gets the cache partition map.
+ /// </summary>
+ public Dictionary<int, ClientCachePartitionMap> CachePartitionMap
+ {
+ get { return _cachePartitionMap; }
+ }
+
+ /// <summary>
+ /// Gets the affinity topology version.
+ /// </summary>
+ public AffinityTopologyVersion AffinityTopologyVersion
+ {
+ get { return _affinityTopologyVersion; }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientRendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientRendezvousAffinityFunction.cs
new file mode 100644
index 0000000..024cfa6
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/Cache/ClientRendezvousAffinityFunction.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client.Cache
+{
+ using System;
+ using System.Diagnostics;
+
+ /// <summary>
+ /// Partial implementation of Rendezvous affinity function for Ignite thin client needs.
+ /// </summary>
+ internal static class ClientRendezvousAffinityFunction
+ {
+ /// <summary>
+ /// Gets partition for given key.
+ /// </summary>
+ /// <param name="keyHash">Key hash code.</param>
+ /// <param name="partitionCount">Partition count.</param>
+ /// <returns>Partition number.</returns>
+ public static int GetPartitionForKey(int keyHash, int partitionCount)
+ {
+ Debug.Assert(partitionCount > 0);
+
+ var mask = (partitionCount & (partitionCount - 1)) == 0 ? partitionCount - 1 : -1;
+
+ if (mask >= 0)
+ {
+ return (keyHash ^ (keyHash >> 16)) & mask;
+ }
+
+ var part = Math.Abs(keyHash % partitionCount);
+
+ return part > 0 ? part : 0;
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
index 8a08368..a230a33 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFailoverSocket.cs
@@ -26,8 +26,11 @@ namespace Apache.Ignite.Core.Impl.Client
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Client;
+ using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
+ using Apache.Ignite.Core.Impl.Client.Cache;
/// <summary>
/// Socket wrapper with reconnect/failover functionality: reconnects on failure.
@@ -43,8 +46,11 @@ namespace Apache.Ignite.Core.Impl.Client
/** Config. */
private readonly IgniteClientConfiguration _config;
+ /** Marshaller. */
+ private readonly Marshaller _marsh;
+
/** Endpoints with corresponding hosts. */
- private readonly List<KeyValuePair<IPEndPoint, string>> _endPoints;
+ private readonly List<SocketEndpoint> _endPoints;
/** Locker. */
private readonly object _syncRoot = new object();
@@ -52,15 +58,30 @@ namespace Apache.Ignite.Core.Impl.Client
/** Disposed flag. */
private bool _disposed;
+ /** Current affinity topology version. */
+ private AffinityTopologyVersion? _affinityTopologyVersion;
+
+ /** Map from node ID to connected socket. */
+ private volatile Dictionary<Guid, ClientSocket> _nodeSocketMap;
+
+ /** Map from cache ID to partition mapping. */
+ private volatile ClientCacheTopologyPartitionMap _distributionMap;
+
+ /** Distribution map locker. */
+ private readonly object _distributionMapSyncRoot = new object();
+
/// <summary>
/// Initializes a new instance of the <see cref="ClientFailoverSocket"/> class.
/// </summary>
/// <param name="config">The configuration.</param>
- public ClientFailoverSocket(IgniteClientConfiguration config)
+ /// <param name="marsh"></param>
+ public ClientFailoverSocket(IgniteClientConfiguration config, Marshaller marsh)
{
Debug.Assert(config != null);
+ Debug.Assert(marsh != null);
_config = config;
+ _marsh = marsh;
#pragma warning disable 618 // Type or member is obsolete
if (config.Host == null && (config.Endpoints == null || config.Endpoints.Count == 0))
@@ -87,6 +108,38 @@ namespace Apache.Ignite.Core.Impl.Client
return GetSocket().DoOutInOp(opId, writeAction, readFunc, errorFunc);
}
+ /// <summary>
+ /// Performs a send-receive operation with affinity awareness.
+ /// </summary>
+ public T DoOutInOpAffinity<T, TKey>(
+ ClientOp opId,
+ Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, T> readFunc,
+ int cacheId,
+ TKey key,
+ Func<ClientStatusCode, string, T> errorFunc = null)
+ {
+ var socket = GetAffinitySocket(cacheId, key) ?? GetSocket();
+
+ return socket.DoOutInOp(opId, writeAction, readFunc, errorFunc);
+ }
+
+ /// <summary>
+ /// Performs an async send-receive operation with affinity awareness.
+ /// </summary>
+ public Task<T> DoOutInOpAffinityAsync<T, TKey>(
+ ClientOp opId,
+ Action<IBinaryStream> writeAction,
+ Func<IBinaryStream, T> readFunc,
+ int cacheId,
+ TKey key,
+ Func<ClientStatusCode, string, T> errorFunc = null)
+ {
+ var socket = GetAffinitySocket(cacheId, key) ?? GetSocket();
+
+ return socket.DoOutInOpAsync(opId, writeAction, readFunc, errorFunc);
+ }
+
/** <inheritdoc /> */
public Task<T> DoOutInOpAsync<T>(ClientOp opId, Action<IBinaryStream> writeAction, Func<IBinaryStream, T> readFunc, Func<ClientStatusCode, string, T> errorFunc = null)
{
@@ -130,12 +183,9 @@ namespace Apache.Ignite.Core.Impl.Client
{
lock (_syncRoot)
{
- if (_disposed)
- {
- throw new ObjectDisposedException("ClientFailoverSocket");
- }
+ ThrowIfDisposed();
- if (_socket == null)
+ if (_socket == null || (_socket.IsDisposed && !_config.ReconnectDisabled))
{
Connect();
}
@@ -144,6 +194,48 @@ namespace Apache.Ignite.Core.Impl.Client
}
}
+ private ClientSocket GetAffinitySocket<TKey>(int cacheId, TKey key)
+ {
+ if (!_config.EnableAffinityAwareness)
+ {
+ return null;
+ }
+
+ UpdateDistributionMap(cacheId);
+
+ var distributionMap = _distributionMap;
+ var socketMap = _nodeSocketMap;
+ ClientCachePartitionMap cachePartMap;
+
+ if (socketMap == null || !distributionMap.CachePartitionMap.TryGetValue(cacheId, out cachePartMap))
+ {
+ return null;
+ }
+
+ var partition = GetPartition(key, cachePartMap.PartitionNodeIds.Count, cachePartMap.KeyConfiguration);
+ var nodeId = cachePartMap.PartitionNodeIds[partition];
+
+ ClientSocket socket;
+ if (socketMap.TryGetValue(nodeId, out socket) && !socket.IsDisposed)
+ {
+ return socket;
+ }
+
+ return null;
+ }
+
+ /// <summary>
+ /// Throws if disposed.
+ /// </summary>
+ /// <exception cref="ObjectDisposedException"></exception>
+ private void ThrowIfDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException("ClientFailoverSocket");
+ }
+ }
+
/** <inheritdoc /> */
[SuppressMessage("Microsoft.Usage", "CA1816:CallGCSuppressFinalizeCorrectly",
Justification = "There is no finalizer.")]
@@ -158,6 +250,16 @@ namespace Apache.Ignite.Core.Impl.Client
_socket.Dispose();
_socket = null;
}
+
+ if (_nodeSocketMap != null)
+ {
+ foreach (var socket in _nodeSocketMap.Values)
+ {
+ socket.Dispose();
+ }
+
+ _nodeSocketMap = null;
+ }
}
}
@@ -168,16 +270,27 @@ namespace Apache.Ignite.Core.Impl.Client
{
List<Exception> errors = null;
var startIdx = (int) Interlocked.Increment(ref _endPointIndex);
+ _socket = null;
for (var i = 0; i < _endPoints.Count; i++)
{
var idx = (startIdx + i) % _endPoints.Count;
var endPoint = _endPoints[idx];
+ if (endPoint.Socket != null && !endPoint.Socket.IsDisposed)
+ {
+ _socket = endPoint.Socket;
+ break;
+ }
+
try
{
- _socket = new ClientSocket(_config, endPoint.Key, endPoint.Value, OnSocketError);
- return;
+ _socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host, null,
+ OnAffinityTopologyVersionChange);
+
+ endPoint.Socket = _socket;
+
+ break;
}
catch (SocketException e)
{
@@ -190,31 +303,30 @@ namespace Apache.Ignite.Core.Impl.Client
}
}
- throw new AggregateException("Failed to establish Ignite thin client connection, " +
- "examine inner exceptions for details.", errors);
- }
-
- /// <summary>
- /// Called when socket error occurs.
- /// </summary>
- private void OnSocketError()
- {
- if (_config.ReconnectDisabled)
+ if (_socket == null && errors != null)
{
- return;
+ throw new AggregateException("Failed to establish Ignite thin client connection, " +
+ "examine inner exceptions for details.", errors);
}
- // Reconnect on next operation.
- lock (_syncRoot)
+ if (_config.EnableAffinityAwareness)
{
- _socket = null;
+ InitSocketMap();
}
}
/// <summary>
+ /// Updates current Affinity Topology Version.
+ /// </summary>
+ private void OnAffinityTopologyVersionChange(AffinityTopologyVersion affinityTopologyVersion)
+ {
+ _affinityTopologyVersion = affinityTopologyVersion;
+ }
+
+ /// <summary>
/// Gets the endpoints: all combinations of IP addresses and ports according to configuration.
/// </summary>
- private static IEnumerable<KeyValuePair<IPEndPoint, string>> GetIpEndPoints(IgniteClientConfiguration cfg)
+ private static IEnumerable<SocketEndpoint> GetIpEndPoints(IgniteClientConfiguration cfg)
{
foreach (var e in Endpoint.GetEndpoints(cfg))
{
@@ -228,7 +340,7 @@ namespace Apache.Ignite.Core.Impl.Client
{
for (var i = 0; i <= e.PortRange; i++)
{
- yield return new KeyValuePair<IPEndPoint, string>(new IPEndPoint(ip, e.Port + i), host);
+ yield return new SocketEndpoint(new IPEndPoint(ip, e.Port + i), host);
}
}
else
@@ -237,11 +349,160 @@ namespace Apache.Ignite.Core.Impl.Client
{
foreach (var x in Dns.GetHostEntry(host).AddressList)
{
- yield return new KeyValuePair<IPEndPoint, string>(new IPEndPoint(x, e.Port + i), host);
+ yield return new SocketEndpoint(new IPEndPoint(x, e.Port + i), host);
+ }
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Returns a value indicating whether distribution map is up to date.
+ /// </summary>
+ /// <returns></returns>
+ private bool IsDistributionMapUpToDate(ClientCacheTopologyPartitionMap map = null)
+ {
+ map = map ?? _distributionMap;
+
+ if (map == null || _affinityTopologyVersion == null)
+ {
+ return false;
+ }
+
+ return map.AffinityTopologyVersion >= _affinityTopologyVersion.Value;
+ }
+
+ /// <summary>
+ /// Updates the partition mapping.
+ /// </summary>
+ private void UpdateDistributionMap(int cacheId)
+ {
+ if (IsDistributionMapUpToDate())
+ return; // Up to date.
+
+ lock (_distributionMapSyncRoot)
+ {
+ if (IsDistributionMapUpToDate())
+ return; // Up to date.
+
+ DoOutInOp(
+ ClientOp.CachePartitions,
+ s => WriteDistributionMapRequest(cacheId, s),
+ s => ReadDistributionMapResponse(s));
+ }
+ }
+
+ private object ReadDistributionMapResponse(IBinaryStream s)
+ {
+ var affinityTopologyVersion = new AffinityTopologyVersion(s.ReadLong(), s.ReadInt());
+ var size = s.ReadInt();
+ var mapping = new Dictionary<int, ClientCachePartitionMap>();
+
+ for (int i = 0; i < size; i++)
+ {
+ var grp = new ClientCacheAffinityAwarenessGroup(s);
+
+ // Count partitions to avoid reallocating array.
+ int maxPartNum = 0;
+ foreach (var partMap in grp.PartitionMap)
+ {
+ foreach (var part in partMap.Value)
+ {
+ if (part > maxPartNum)
+ {
+ maxPartNum = part;
}
}
}
+
+ // Populate partition array.
+ var partNodeIds = new Guid[maxPartNum + 1];
+ foreach (var partMap in grp.PartitionMap)
+ {
+ foreach (var part in partMap.Value)
+ {
+ partNodeIds[part] = partMap.Key;
+ }
+ }
+
+ foreach (var cache in grp.Caches)
+ {
+ mapping[cache.Key] = new ClientCachePartitionMap(cache.Key, partNodeIds, cache.Value);
+ }
}
+
+ _distributionMap = new ClientCacheTopologyPartitionMap(mapping, affinityTopologyVersion);
+
+ return null;
+ }
+
+ private void WriteDistributionMapRequest(int cacheId, IBinaryStream s)
+ {
+ if (_distributionMap != null)
+ {
+ // Map exists: request update for all caches.
+ var mapContainsCacheId = _distributionMap.CachePartitionMap.ContainsKey(cacheId);
+ var count = _distributionMap.CachePartitionMap.Count;
+ if (!mapContainsCacheId)
+ {
+ count++;
+ }
+
+ s.WriteInt(count);
+
+ foreach (var cachePartitionMap in _distributionMap.CachePartitionMap)
+ {
+ s.WriteInt(cachePartitionMap.Key);
+ }
+
+ if (!mapContainsCacheId)
+ {
+ s.WriteInt(cacheId);
+ }
+ }
+ else
+ {
+ // Map does not exist yet: request update for specified cache only.
+ s.WriteInt(1);
+ s.WriteInt(cacheId);
+ }
+ }
+
+ private int GetPartition<TKey>(TKey key, int partitionCount, IDictionary<int, int> keyConfiguration)
+ {
+ var keyHash = BinaryHashCodeUtils.GetHashCode(key, _marsh, keyConfiguration);
+ return ClientRendezvousAffinityFunction.GetPartitionForKey(keyHash, partitionCount);
+ }
+
+ private void InitSocketMap()
+ {
+ var map = new Dictionary<Guid, ClientSocket>();
+
+ foreach (var endPoint in _endPoints)
+ {
+ if (endPoint.Socket == null || endPoint.Socket.IsDisposed)
+ {
+ try
+ {
+ var socket = new ClientSocket(_config, endPoint.EndPoint, endPoint.Host, null,
+ OnAffinityTopologyVersionChange);
+
+ endPoint.Socket = socket;
+ }
+ catch (SocketException)
+ {
+ continue;
+ }
+ }
+
+ var nodeId = endPoint.Socket.ServerNodeId;
+ if (nodeId != null)
+ {
+ map[nodeId.Value] = endPoint.Socket;
+ }
+ }
+
+ _nodeSocketMap = map;
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFlags.cs
similarity index 51%
copy from modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs
copy to modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFlags.cs
index b8a8f89..bcfcf76 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientFlags.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,34 +15,17 @@
* limitations under the License.
*/
-using System;
-
-#pragma warning disable 414 // Unused FuncPtr
-
-namespace Apache.Ignite.Core.Impl.Unmanaged.Jni
+namespace Apache.Ignite.Core.Impl.Client
{
- using System.Diagnostics.CodeAnalysis;
- using System.Runtime.InteropServices;
+ using System;
/// <summary>
- /// JNINativeMethod structure for registering Java -> .NET callbacks.
+ /// Client operation flags.
/// </summary>
- [SuppressMessage("Microsoft.Design", "CA1049:TypesThatOwnNativeResourcesShouldBeDisposable")]
- internal struct NativeMethod
+ [Flags]
+ internal enum ClientFlags : short
{
- /// <summary>
- /// Method name, char*.
- /// </summary>
- public IntPtr Name;
-
- /// <summary>
- /// Method signature, char*.
- /// </summary>
- public IntPtr Signature;
-
- /// <summary>
- /// Function pointer (from <see cref="Marshal.GetFunctionPointerForDelegate"/>).
- /// </summary>
- public IntPtr FuncPtr;
+ Error = 1,
+ AffinityTopologyChanged = 2
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
index fae3bb9..c5e0f7a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientOp.cs
@@ -54,6 +54,7 @@ namespace Apache.Ignite.Core.Impl.Client
CacheGetOrCreateWithConfiguration = 1054,
CacheGetConfiguration = 1055,
CacheDestroy = 1056,
+ CachePartitions = 1101,
// Queries.
QueryScan = 2000,
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
index e2567ef..45444b9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/ClientSocket.cs
@@ -27,6 +27,7 @@ namespace Apache.Ignite.Core.Impl.Client
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
+ using Apache.Ignite.Core.Cache.Affinity;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Binary.IO;
@@ -49,8 +50,11 @@ namespace Apache.Ignite.Core.Impl.Client
/** Version 1.3.0. */
public static readonly ClientProtocolVersion Ver130 = new ClientProtocolVersion(1, 3, 0);
+ /** Version 1.4.0. */
+ public static readonly ClientProtocolVersion Ver140 = new ClientProtocolVersion(1, 4, 0);
+
/** Current version. */
- public static readonly ClientProtocolVersion CurrentProtocolVersion = Ver130;
+ public static readonly ClientProtocolVersion CurrentProtocolVersion = Ver140;
/** Handshake opcode. */
private const byte OpHandshake = 1;
@@ -98,8 +102,8 @@ namespace Apache.Ignite.Core.Impl.Client
/** Disposed flag. */
private bool _isDisposed;
- /** Error callback. */
- private readonly Action _onError;
+ /** Topology version update callback. */
+ private readonly Action<AffinityTopologyVersion> _topVerCallback;
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket" /> class.
@@ -107,14 +111,15 @@ namespace Apache.Ignite.Core.Impl.Client
/// <param name="clientConfiguration">The client configuration.</param>
/// <param name="endPoint">The end point to connect to.</param>
/// <param name="host">The host name (required for SSL).</param>
- /// <param name="onError">Error callback.</param>
/// <param name="version">Protocol version.</param>
+ /// <param name="topVerCallback">Topology version update callback.</param>
public ClientSocket(IgniteClientConfiguration clientConfiguration, EndPoint endPoint, string host,
- Action onError = null, ClientProtocolVersion? version = null)
+ ClientProtocolVersion? version = null,
+ Action<AffinityTopologyVersion> topVerCallback = null)
{
Debug.Assert(clientConfiguration != null);
- _onError = onError;
+ _topVerCallback = topVerCallback;
_timeout = clientConfiguration.SocketTimeout;
_socket = Connect(clientConfiguration, endPoint);
@@ -205,6 +210,19 @@ namespace Apache.Ignite.Core.Impl.Client
public EndPoint LocalEndPoint { get { return _socket.LocalEndPoint; } }
/// <summary>
+ /// Gets the ID of the connected server node.
+ /// </summary>
+ public Guid? ServerNodeId { get; private set; }
+
+ /// <summary>
+ /// Gets a value indicating whether this socket is disposed.
+ /// </summary>
+ public bool IsDisposed
+ {
+ get { return _isDisposed; }
+ }
+
+ /// <summary>
/// Starts waiting for the new message.
/// </summary>
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
@@ -264,10 +282,32 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Decodes the response that we got from <see cref="HandleResponse"/>.
/// </summary>
- private static T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc,
+ private T DecodeResponse<T>(BinaryHeapStream stream, Func<IBinaryStream, T> readFunc,
Func<ClientStatusCode, string, T> errorFunc)
{
- var statusCode = (ClientStatusCode)stream.ReadInt();
+ ClientStatusCode statusCode;
+
+ if (ServerVersion.CompareTo(Ver140) >= 0)
+ {
+ var flags = (ClientFlags) stream.ReadShort();
+
+ if ((flags & ClientFlags.AffinityTopologyChanged) == ClientFlags.AffinityTopologyChanged)
+ {
+ var topVer = new AffinityTopologyVersion(stream.ReadLong(), stream.ReadInt());
+ if (_topVerCallback != null)
+ {
+ _topVerCallback(topVer);
+ }
+ }
+
+ statusCode = (flags & ClientFlags.Error) == ClientFlags.Error
+ ? (ClientStatusCode) stream.ReadInt()
+ : ClientStatusCode.Success;
+ }
+ else
+ {
+ statusCode = (ClientStatusCode) stream.ReadInt();
+ }
if (statusCode == ClientStatusCode.Success)
{
@@ -330,6 +370,11 @@ namespace Apache.Ignite.Core.Impl.Client
if (success)
{
+ if (version.CompareTo(Ver140) >= 0)
+ {
+ ServerNodeId = BinaryUtils.Marshaller.Unmarshal<Guid>(stream);
+ }
+
ServerVersion = version;
return;
@@ -399,10 +444,6 @@ namespace Apache.Ignite.Core.Impl.Client
{
// Disconnected.
_exception = _exception ?? new SocketException((int) SocketError.ConnectionAborted);
- if (_onError != null)
- {
- _onError();
- }
Dispose();
CheckException();
}
@@ -521,12 +562,10 @@ namespace Apache.Ignite.Core.Impl.Client
{
_stream.Write(buf, 0, len);
}
- catch (Exception)
+ catch (Exception e)
{
- if (_onError != null)
- {
- _onError();
- }
+ _exception = e;
+ Dispose();
throw;
}
}
@@ -540,12 +579,10 @@ namespace Apache.Ignite.Core.Impl.Client
{
return _stream.Read(buf, pos, len);
}
- catch (Exception)
+ catch (Exception e)
{
- if (_onError != null)
- {
- _onError();
- }
+ _exception = e;
+ Dispose();
throw;
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
index c59e9fc..0451cbc 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/IgniteClient.cs
@@ -40,7 +40,7 @@ namespace Apache.Ignite.Core.Impl.Client
internal class IgniteClient : IIgniteInternal, IIgniteClient
{
/** Socket. */
- private readonly IClientSocket _socket;
+ private readonly ClientFailoverSocket _socket;
/** Marshaller. */
private readonly Marshaller _marsh;
@@ -64,13 +64,13 @@ namespace Apache.Ignite.Core.Impl.Client
_configuration = new IgniteClientConfiguration(clientConfiguration);
- _socket = new ClientFailoverSocket(_configuration);
-
_marsh = new Marshaller(_configuration.BinaryConfiguration)
{
Ignite = this
};
+ _socket = new ClientFailoverSocket(_configuration, _marsh);
+
_binProc = _configuration.BinaryProcessor ?? new BinaryProcessorClient(_socket);
_binary = new Binary(_marsh);
@@ -79,7 +79,7 @@ namespace Apache.Ignite.Core.Impl.Client
/// <summary>
/// Gets the socket.
/// </summary>
- public IClientSocket Socket
+ public ClientFailoverSocket Socket
{
get { return _socket; }
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/SocketEndpoint.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/SocketEndpoint.cs
new file mode 100644
index 0000000..cb3e3e1
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Client/SocketEndpoint.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Impl.Client
+{
+ using System.Diagnostics;
+ using System.Net;
+
+ /// <summary>
+ /// Internal representation of client socket endpoint.
+ /// </summary>
+ internal class SocketEndpoint
+ {
+ /** */
+ private readonly IPEndPoint _endPoint;
+
+ /** */
+ private readonly string _host;
+
+ /** */
+ private volatile ClientSocket _socket;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="SocketEndpoint"/> class.
+ /// </summary>
+ public SocketEndpoint(IPEndPoint endPoint, string host)
+ {
+ _endPoint = endPoint;
+ _host = host;
+ }
+
+ /// <summary>
+ /// Gets the socket.
+ /// </summary>
+ public ClientSocket Socket
+ {
+ get { return _socket; }
+ set
+ {
+ Debug.Assert(value != null);
+ _socket = value;
+ }
+ }
+
+ /// <summary>
+ /// Gets the IPEndPoint.
+ /// </summary>
+ public IPEndPoint EndPoint
+ {
+ get { return _endPoint; }
+ }
+
+ /// <summary>
+ /// Gets the host.
+ /// </summary>
+ public string Host
+ {
+ get { return _host; }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs
index b8a8f89..54b239c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/NativeMethod.cs
@@ -43,6 +43,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged.Jni
/// <summary>
/// Function pointer (from <see cref="Marshal.GetFunctionPointerForDelegate"/>).
/// </summary>
+ // ReSharper disable once NotAccessedField.Global
public IntPtr FuncPtr;
}
}