You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/15 06:35:59 UTC
[ignite-3] branch main updated: IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f8f4b2a IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)
f8f4b2a is described below
commit f8f4b2a361419d43b91b8ee2ce4b5c85883f3aae
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue Mar 15 09:35:51 2022 +0300
IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)
* Add `IClientRetryPolicy` interface.
* Add predefined policies: `RetryAllPolicy`, `RetryReadPolicy`, `RetryNonePolicy`, `RetryLimitPolicy`.
* Add `IgniteClientConfiguration.RetryPolicy` property, defaults to `RetryNonePolicy` (no retries by default).
* Refactor `PooledArrayBufferWriter` to allow sending the same serialized request multiple times.
* Fix table API to use `FailoverSocket` for non-transactional operations to leverage retry/reconnect.
https://cwiki.apache.org/confluence/display/IGNITE/IEP-82+Thin+Client+Retry+Policy
---
.../Buffers/PooledArrayBufferWriterTests.cs | 66 +------
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 201 +++++++++++++++++++
.../dotnet/Apache.Ignite.Tests/FakeServerTests.cs | 75 +++++++
.../Apache.Ignite.Tests/ProjectFilesTests.cs | 1 +
.../Proto/ClientOpExtensionsTest.cs} | 30 ++-
.../Proto/MessagePackExtensionsTest.cs | 8 +-
.../dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs | 218 +++++++++++++++++++++
.../RetryReadPolicyTests.cs} | 32 ++-
.../Serialization/ObjectSerializerHandlerTests.cs | 12 +-
.../dotnet/Apache.Ignite/ClientOperationType.cs | 112 +++++++++++
.../Proto/MessagePackUtil.cs => IRetryPolicy.cs} | 30 ++-
.../MessagePackUtil.cs => IRetryPolicyContext.cs} | 39 ++--
.../Apache.Ignite/IgniteClientConfiguration.cs | 12 ++
.../Internal/Buffers/PooledArrayBufferWriter.cs | 54 +----
.../Apache.Ignite/Internal/ClientFailoverSocket.cs | 105 +++++++++-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 166 ++++++++++++----
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 6 -
.../Internal/Proto/ClientOpExtensions.cs | 61 ++++++
.../Internal/Proto/MessagePackUtil.cs | 66 +++++--
.../Apache.Ignite/Internal/RetryPolicyContext.cs | 58 ++++++
.../Apache.Ignite/Internal/Table/RecordView.cs | 14 +-
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 25 +--
.../Proto/MessagePackUtil.cs => RetryAllPolicy.cs} | 24 +--
.../MessagePackUtil.cs => RetryLimitPolicy.cs} | 32 +--
.../MessagePackUtil.cs => RetryNonePolicy.cs} | 26 +--
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 66 +++++++
26 files changed, 1227 insertions(+), 312 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
index 15cc80a..b281748 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
@@ -26,78 +26,18 @@ namespace Apache.Ignite.Tests.Buffers
public class PooledArrayBufferWriterTests
{
[Test]
- public void TestBufferWriterPrependsMessageLength()
+ public void TestBufferWriterReservesPrefixSpace()
{
using var bufferWriter = new PooledArrayBufferWriter();
-
- // With payload.
var writer = bufferWriter.GetMessageWriter();
writer.Write(1);
writer.Write("A");
writer.Flush();
- var res = bufferWriter.GetWrittenMemory().ToArray();
-
- var expectedBytes = new byte[]
- {
- // 4 bytes BE length + 1 byte int + 1 byte fixstr + 1 byte char.
- 0, 0, 0, 3, 1, 0xa1, (byte)'A'
- };
-
- CollectionAssert.AreEqual(expectedBytes, res);
- }
-
- [Test]
- public void TestBufferWriterPrependsPrefixAndMessageLength()
- {
- using var bufferWriter = new PooledArrayBufferWriter();
-
- var writer = bufferWriter.GetMessageWriter();
- writer.Write(1);
- writer.Write("A");
- writer.Flush();
-
- var prefixWriter = bufferWriter.GetPrefixWriter(3);
- prefixWriter.Write(7);
- prefixWriter.Write(8);
- prefixWriter.Write(9);
- prefixWriter.Flush();
-
- var res = bufferWriter.GetWrittenMemory().ToArray();
-
- var expectedBytes = new byte[]
- {
- // 4 bytes BE length + 3 bytes prefix + 1 byte int + 1 byte fixstr + 1 byte char.
- 0, 0, 0, 6, 7, 8, 9, 1, 0xa1, (byte)'A'
- };
-
- CollectionAssert.AreEqual(expectedBytes, res);
- }
-
- [Test]
- public void TestEmptyBufferWriterPrependsZeroMessageLength()
- {
- using var bufferWriter = new PooledArrayBufferWriter();
-
- var res = bufferWriter.GetWrittenMemory().ToArray();
-
- CollectionAssert.AreEqual(new byte[] { 0, 0, 0, 0 }, res);
- }
-
- [Test]
- public void TestEmptyBufferWriterPrependsPrefix()
- {
- using var bufferWriter = new PooledArrayBufferWriter();
-
- var writer = bufferWriter.GetPrefixWriter(2);
- writer.Write(7);
- writer.Write(8);
- writer.Flush();
-
- var res = bufferWriter.GetWrittenMemory().ToArray();
+ var res = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..].ToArray();
- CollectionAssert.AreEqual(new byte[] { 0, 0, 0, 2, 7, 8 }, res);
+ CollectionAssert.AreEqual(new byte[] { 1, 0xa1, (byte)'A' }, res);
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
new file mode 100644
index 0000000..58ef00d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+ using System;
+ using System.Buffers;
+ using System.Net;
+ using System.Net.Sockets;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Internal.Proto;
+ using MessagePack;
+
+ /// <summary>
+ /// Fake Ignite server for test purposes.
+ /// </summary>
+ public sealed class FakeServer : IDisposable
+ {
+ public const string Err = "Err!";
+
+ public const string ExistingTableName = "tbl1";
+
+ private readonly Socket _listener;
+
+ private readonly CancellationTokenSource _cts = new();
+
+ private readonly Func<int, bool> _shouldDropConnection;
+
+ public FakeServer(Func<int, bool>? shouldDropConnection = null)
+ {
+ _shouldDropConnection = shouldDropConnection ?? (_ => false);
+ _listener = new Socket(IPAddress.Loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+ _listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
+ _listener.Listen(backlog: 1);
+
+ Task.Run(ListenLoop);
+ }
+
+ public async Task<IIgniteClient> ConnectClientAsync(IgniteClientConfiguration? cfg = null)
+ {
+ var port = ((IPEndPoint)_listener.LocalEndPoint).Port;
+
+ cfg ??= new IgniteClientConfiguration();
+
+ cfg.Endpoints.Clear();
+ cfg.Endpoints.Add("127.0.0.1:" + port);
+
+ return await IgniteClient.StartAsync(cfg);
+ }
+
+ public void Dispose()
+ {
+ _cts.Cancel();
+ _listener.Disconnect(true);
+ _listener.Dispose();
+ _cts.Dispose();
+ }
+
+ private static int ReceiveMessageSize(Socket handler) =>
+ IPAddress.NetworkToHostOrder(BitConverter.ToInt32(ReceiveBytes(handler, 4)));
+
+ private static byte[] ReceiveBytes(Socket socket, int size)
+ {
+ int received = 0;
+ var buf = new byte[size];
+
+ while (received < size)
+ {
+ var res = socket.Receive(buf, received, size - received, SocketFlags.None);
+
+ if (res == 0)
+ {
+ throw new Exception("Connection lost");
+ }
+
+ received += res;
+ }
+
+ return buf;
+ }
+
+ private void ListenLoop()
+ {
+ int requestCount = 0;
+
+ while (!_cts.IsCancellationRequested)
+ {
+ using Socket handler = _listener.Accept();
+
+ // Read handshake.
+ ReceiveBytes(handler, 4); // Magic.
+ var msgSize = ReceiveMessageSize(handler);
+ ReceiveBytes(handler, msgSize);
+
+ // Write handshake response.
+ handler.Send(ProtoCommon.MagicBytes);
+ handler.Send(new byte[] { 0, 0, 0, 7 }); // Size.
+ handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128 });
+
+ while (!_cts.IsCancellationRequested)
+ {
+ msgSize = ReceiveMessageSize(handler);
+ var msg = ReceiveBytes(handler, msgSize);
+
+ if (_shouldDropConnection(++requestCount))
+ {
+ break;
+ }
+
+ // Assume fixint8.
+ var opCode = (ClientOp)msg[0];
+ var requestId = msg[1];
+
+ if (opCode == ClientOp.TablesGet)
+ {
+ handler.Send(new byte[] { 0, 0, 0, 4 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 0, 128 }); // Empty map.
+
+ continue;
+ }
+
+ if (opCode == ClientOp.TableGet)
+ {
+ var reader = new MessagePackReader(msg.AsMemory()[2..]);
+ var tableName = reader.ReadString();
+
+ if (tableName == ExistingTableName)
+ {
+ handler.Send(new byte[] { 0, 0, 0, 21 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 0 });
+
+ var arrayBufferWriter = new ArrayBufferWriter<byte>();
+ var writer = new MessagePackWriter(arrayBufferWriter);
+ writer.Write(Guid.Empty);
+ writer.Flush();
+
+ handler.Send(arrayBufferWriter.WrittenSpan);
+
+ continue;
+ }
+ }
+
+ if (opCode == ClientOp.SchemasGet)
+ {
+ handler.Send(new byte[] { 0, 0, 0, 6 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 0 });
+
+ var arrayBufferWriter = new ArrayBufferWriter<byte>();
+ var writer = new MessagePackWriter(arrayBufferWriter);
+ writer.WriteMapHeader(1);
+ writer.Write(1); // Version.
+ writer.WriteArrayHeader(0); // Columns.
+ writer.Flush();
+
+ handler.Send(arrayBufferWriter.WrittenSpan);
+
+ continue;
+ }
+
+ if (opCode == ClientOp.TupleUpsert)
+ {
+ handler.Send(new byte[] { 0, 0, 0, 3 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 0 }); // No payload.
+
+ continue;
+ }
+
+ if (opCode == ClientOp.TxBegin)
+ {
+ handler.Send(new byte[] { 0, 0, 0, 4 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 0, 0 }); // Tx id.
+
+ continue;
+ }
+
+ // Fake error message for any other op code.
+ handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
+ handler.Send(new byte[] { 0, requestId, 1, 160 | 4, (byte)Err[0], (byte)Err[1], (byte)Err[2], (byte)Err[3] });
+ }
+
+ handler.Disconnect(true);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
new file mode 100644
index 0000000..22a5c64
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+ using System.Threading.Tasks;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests that <see cref="FakeServer"/> works as expected.
+ /// </summary>
+ public class FakeServerTests
+ {
+ [Test]
+ public async Task TestConnectToFakeServerAndGetTablesReturnsEmptyList()
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ var tables = await client.Tables.GetTablesAsync();
+ Assert.AreEqual(0, tables.Count);
+ }
+
+ [Test]
+ public async Task TestConnectToFakeServerAndGetTableThrowsError()
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTableAsync("t"));
+ Assert.AreEqual(FakeServer.Err, ex!.Message);
+ }
+
+ [Test]
+ public async Task TestConnectToFakeServerAndGetExistingTableReturnsTable()
+ {
+ using var server = new FakeServer();
+ using var client = await server.ConnectClientAsync();
+
+ var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ Assert.IsNotNull(table);
+ Assert.AreEqual(FakeServer.ExistingTableName, table!.Name);
+ }
+
+ [Test]
+ public async Task TestFakeServerDropsConnectionOnSpecifiedRequestCount()
+ {
+ using var server = new FakeServer(reqId => reqId % 3 == 0);
+ using var client = await server.ConnectClientAsync();
+
+ // 2 requests succeed, 3rd fails.
+ await client.Tables.GetTablesAsync();
+ await client.Tables.GetTablesAsync();
+
+ Assert.CatchAsync(async () => await client.Tables.GetTablesAsync());
+
+ // Reconnect by FailoverSocket logic.
+ await client.Tables.GetTablesAsync();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
index af20911..09208a3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
@@ -62,6 +62,7 @@ namespace Apache.Ignite.Tests
if (file.Contains(InternalDir, StringComparison.Ordinal) ||
file.Contains(".Tests", StringComparison.Ordinal) ||
file.Contains(".Benchmarks", StringComparison.Ordinal) ||
+ file.EndsWith("RetryLimitPolicy.cs", StringComparison.Ordinal) ||
file.EndsWith("Exception.cs", StringComparison.Ordinal))
{
continue;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
index 19a08df..50ccdba 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
@@ -15,30 +15,26 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite.Tests.Proto
{
- using MessagePack;
+ using System.Linq;
+ using Internal.Proto;
+ using NUnit.Framework;
/// <summary>
- /// MessagePack utils.
+ /// Tests for <see cref="ClientOpExtensions"/>.
/// </summary>
- internal static class MessagePackUtil
+ public class ClientOpExtensionsTest
{
- /// <summary>
- /// Gets the write size for the specified value.
- /// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
+ [Test]
+ public void TestToPublicOperationTypeSupportsAllOps()
{
- return value switch
+ var ops = typeof(ClientOp).GetEnumValues().Cast<ClientOp>().ToList();
+
+ foreach (var op in ops)
{
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
+ Assert.DoesNotThrow(() => op.ToPublicOperationType(), op.ToString());
+ }
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
index 0197e20..e97b056 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
@@ -118,7 +118,11 @@ namespace Apache.Ignite.Tests.Proto
writer.Write(Guid.Parse(JavaUuidString));
writer.Flush();
- var bytes = bufferWriter.GetWrittenMemory()[4..].ToArray().Select(b => (sbyte) b).ToArray();
+ var bytes = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..]
+ .ToArray()
+ .Select(b => (sbyte) b)
+ .ToArray();
+
CollectionAssert.AreEqual(JavaUuidBytes, bytes);
}
@@ -154,7 +158,7 @@ namespace Apache.Ignite.Tests.Proto
var bufferWriter = new PooledArrayBufferWriter();
write(bufferWriter);
- var mem = bufferWriter.GetWrittenMemory()[4..]; // Skip length.
+ var mem = bufferWriter.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize);
return read(mem);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
new file mode 100644
index 0000000..d8c35d3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+ using System.Collections.Generic;
+ using System.Threading.Tasks;
+ using Ignite.Table;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests client behavior with different <see cref="IgniteClientConfiguration.RetryPolicy"/> settings.
+ /// </summary>
+ public class RetryPolicyTests
+ {
+ private const int IterCount = 100;
+
+ [Test]
+ public async Task TestFailoverWithRetryPolicyCompletesOperationWithoutException()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new RetryAllPolicy { RetryLimit = 1 }
+ };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ for (int i = 0; i < IterCount; i++)
+ {
+ await client.Tables.GetTablesAsync();
+ }
+ }
+
+ [Test]
+ public async Task TestFailoverWithRetryPolicyDoesNotRetryUnrelatedErrors()
+ {
+ var cfg = new IgniteClientConfiguration { RetryPolicy = RetryAllPolicy.Instance };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTableAsync("bad-table"));
+ Assert.AreEqual(FakeServer.Err, ex!.Message);
+ }
+
+ [Test]
+ public async Task TestFailoverWithRetryPolicyThrowsOnRetryLimitExceeded()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new TestRetryPolicy { RetryLimit = 5 }
+ };
+
+ using var server = new FakeServer(reqId => reqId > 1);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ await client.Tables.GetTablesAsync();
+
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTablesAsync());
+ Assert.AreEqual("Operation failed after 5 retries, examine InnerException for details.", ex!.Message);
+ }
+
+ [Test]
+ public async Task TestZeroRetryLimitDoesNotLimitRetryCount()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new RetryAllPolicy { RetryLimit = 0 }
+ };
+
+ using var server = new FakeServer(reqId => reqId % 10 != 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ for (var i = 0; i < IterCount; i++)
+ {
+ await client.Tables.GetTablesAsync();
+ }
+ }
+
+ [Test]
+ public async Task TestRetryPolicyIsDisabledByDefault()
+ {
+ using var server = new FakeServer(reqId => reqId > 1);
+ using var client = await server.ConnectClientAsync();
+
+ await client.Tables.GetTablesAsync();
+
+ Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTablesAsync());
+ }
+
+ [Test]
+ public async Task TestCustomRetryPolicyIsInvokedWithCorrectContext()
+ {
+ var testRetryPolicy = new TestRetryPolicy { RetryLimit = 3 };
+
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = testRetryPolicy
+ };
+
+ using var server = new FakeServer(reqId => reqId % 3 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ for (var i = 0; i < IterCount; i++)
+ {
+ await client.Tables.GetTablesAsync();
+ }
+
+ Assert.AreEqual(49, testRetryPolicy.Invocations.Count);
+
+ var inv = testRetryPolicy.Invocations[0];
+
+ Assert.AreNotSame(cfg, inv.Configuration);
+ Assert.AreSame(testRetryPolicy, inv.Configuration.RetryPolicy);
+ Assert.AreEqual(ClientOperationType.TablesGet, inv.Operation);
+ Assert.AreEqual(0, inv.Iteration);
+ }
+
+ [Test]
+ public async Task TestTableOperationWithoutTxIsRetried()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
+ };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ for (int i = 0; i < IterCount; i++)
+ {
+ var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+ await table!.RecordBinaryView.UpsertAsync(null, new IgniteTuple());
+ }
+ }
+
+ [Test]
+ public async Task TestTableOperationWithTxIsNotRetried()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new TestRetryPolicy()
+ };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+ var tx = await client.Transactions.BeginAsync();
+
+ var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await table!.RecordBinaryView.UpsertAsync(tx, new IgniteTuple()));
+ StringAssert.StartsWith("Socket is closed due to an error", ex!.Message);
+ }
+
+ [Test]
+ public async Task TestRetryOperationWithPayloadReusesPooledBufferCorrectly()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
+ };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ for (int i = 0; i < IterCount; i++)
+ {
+ var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ Assert.IsNotNull(table);
+ }
+ }
+
+ [Test]
+ public async Task TestRetryReadPolicyDoesNotRetryWriteOperations()
+ {
+ var cfg = new IgniteClientConfiguration
+ {
+ RetryPolicy = new RetryReadPolicy()
+ };
+
+ using var server = new FakeServer(reqId => reqId % 2 == 0);
+ using var client = await server.ConnectClientAsync(cfg);
+
+ var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+ Assert.ThrowsAsync<IgniteClientException>(async () => await table!.RecordBinaryView.UpsertAsync(null, new IgniteTuple()));
+ }
+
+ private class TestRetryPolicy : RetryLimitPolicy
+ {
+ private readonly List<IRetryPolicyContext> _invocations = new();
+
+ public IReadOnlyList<IRetryPolicyContext> Invocations => _invocations;
+
+ public override bool ShouldRetry(IRetryPolicyContext context)
+ {
+ _invocations.Add(context);
+
+ return base.ShouldRetry(context);
+ }
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
index 19a08df..19e1656 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
@@ -15,30 +15,28 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite.Tests
{
- using MessagePack;
+ using System.Linq;
+ using Internal;
+ using NUnit.Framework;
/// <summary>
- /// MessagePack utils.
+ /// Tests for <see cref="RetryReadPolicy"/>.
/// </summary>
- internal static class MessagePackUtil
+ public class RetryReadPolicyTests
{
- /// <summary>
- /// Gets the write size for the specified value.
- /// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
+ [Test]
+ public void TestRetryReadPolicySupportsAllOperations()
{
- return value switch
+ var opTypes = typeof(ClientOperationType).GetEnumValues().Cast<ClientOperationType>().ToList();
+
+ foreach (var opType in opTypes)
{
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
+ var ctx = new RetryPolicyContext(new(), opType, 1, new());
+
+ Assert.DoesNotThrow(() => RetryReadPolicy.Instance.ShouldRetry(ctx), opType.ToString());
+ }
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
index f18df9f..b880d17 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
@@ -54,9 +54,9 @@ namespace Apache.Ignite.Tests.Table.Serialization
[Test]
public void TestWriteUnsigned()
{
- var pooledWriter = Write(new UnsignedPoco(ulong.MaxValue, "foo"));
+ var bytes = Write(new UnsignedPoco(ulong.MaxValue, "foo"));
- var resMem = pooledWriter.GetWrittenMemory()[4..]; // Skip length header.
+ var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
var reader = new MessagePackReader(resMem);
Assert.AreEqual(ulong.MaxValue, reader.ReadUInt64());
@@ -132,13 +132,13 @@ namespace Apache.Ignite.Tests.Table.Serialization
private static MessagePackReader WriteAndGetReader(bool keyOnly = false)
{
- var pooledWriter = Write(new Poco { Key = 1234, Val = "foo" }, keyOnly);
+ var bytes = Write(new Poco { Key = 1234, Val = "foo" }, keyOnly);
- var resMem = pooledWriter.GetWrittenMemory()[4..]; // Skip length header.
+ var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
return new MessagePackReader(resMem);
}
- private static PooledArrayBufferWriter Write<T>(T obj, bool keyOnly = false)
+ private static byte[] Write<T>(T obj, bool keyOnly = false)
where T : class
{
var handler = new ObjectSerializerHandler<T>();
@@ -148,7 +148,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
handler.Write(ref writer, Schema, obj, keyOnly);
writer.Flush();
- return pooledWriter;
+ return pooledWriter.GetWrittenMemory().ToArray();
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
new file mode 100644
index 0000000..8ea75bb
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite
+{
+ using Table;
+
+ /// <summary>
+ /// Client operation type.
+ /// </summary>
+ public enum ClientOperationType
+ {
+ /// <summary>
+ /// Get tables (<see cref="ITables.GetTablesAsync"/>).
+ /// </summary>
+ TablesGet,
+
+ /// <summary>
+ /// Get table (<see cref="ITables.GetTableAsync"/>).
+ /// </summary>
+ TableGet,
+
+ /// <summary>
+ /// Upsert (<see cref="IRecordView{T}.UpsertAsync"/>).
+ /// </summary>
+ TupleUpsert,
+
+ /// <summary>
+ /// Get (<see cref="IRecordView{T}.GetAsync"/>).
+ /// </summary>
+ TupleGet,
+
+ /// <summary>
+ /// Upsert (<see cref="IRecordView{T}.UpsertAllAsync"/>).
+ /// </summary>
+ TupleUpsertAll,
+
+ /// <summary>
+ /// Get All (<see cref="IRecordView{T}.GetAllAsync"/>).
+ /// </summary>
+ TupleGetAll,
+
+ /// <summary>
+ /// Get and Upsert (<see cref="IRecordView{T}.GetAndUpsertAsync"/>).
+ /// </summary>
+ TupleGetAndUpsert,
+
+ /// <summary>
+ /// Insert (<see cref="IRecordView{T}.InsertAsync"/>).
+ /// </summary>
+ TupleInsert,
+
+ /// <summary>
+ /// Insert All (<see cref="IRecordView{T}.InsertAllAsync"/>).
+ /// </summary>
+ TupleInsertAll,
+
+ /// <summary>
+ /// Replace (<see cref="IRecordView{T}.ReplaceAsync(Apache.Ignite.Transactions.ITransaction?,T)"/>).
+ /// </summary>
+ TupleReplace,
+
+ /// <summary>
+ /// Replace Exact (<see cref="IRecordView{T}.ReplaceAsync(Apache.Ignite.Transactions.ITransaction?,T, T)"/>).
+ /// </summary>
+ TupleReplaceExact,
+
+ /// <summary>
+ /// Get and Replace (<see cref="IRecordView{T}.GetAndReplaceAsync"/>).
+ /// </summary>
+ TupleGetAndReplace,
+
+ /// <summary>
+ /// Delete (<see cref="IRecordView{T}.DeleteAsync"/>).
+ /// </summary>
+ TupleDelete,
+
+ /// <summary>
+ /// Delete All (<see cref="IRecordView{T}.DeleteAllAsync"/>).
+ /// </summary>
+ TupleDeleteAll,
+
+ /// <summary>
+ /// Delete Exact (<see cref="IRecordView{T}.DeleteExactAsync"/>).
+ /// </summary>
+ TupleDeleteExact,
+
+ /// <summary>
+ /// Delete All Exact (<see cref="IRecordView{T}.DeleteAllExactAsync"/>).
+ /// </summary>
+ TupleDeleteAllExact,
+
+ /// <summary>
+ /// Get and Delete (<see cref="IRecordView{T}.GetAndDeleteAsync"/>).
+ /// </summary>
+ TupleGetAndDelete
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
index 19a08df..c814909 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
@@ -15,30 +15,22 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
{
- using MessagePack;
-
/// <summary>
- /// MessagePack utils.
+ /// Client retry policy determines whether client operations that have failed due to a connection issue
+ /// should be retried.
/// </summary>
- internal static class MessagePackUtil
+ public interface IRetryPolicy
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Gets a value indicating whether a client operation that has failed due to a connection issue
+ /// should be retried.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
- {
- return value switch
- {
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
- }
+ /// <param name="context">Operation context.</param>
+ /// <returns>
+ /// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise.
+ /// </returns>
+ bool ShouldRetry(IRetryPolicyContext context);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
similarity index 57%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
index 19a08df..c7e9706 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
@@ -15,30 +15,33 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
{
- using MessagePack;
+ using System;
/// <summary>
- /// MessagePack utils.
+ /// Retry policy context. See <see cref="IRetryPolicy.ShouldRetry"/>.
/// </summary>
- internal static class MessagePackUtil
+ public interface IRetryPolicyContext
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Gets the client configuration.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
- {
- return value switch
- {
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
- }
+ IgniteClientConfiguration Configuration { get; }
+
+ /// <summary>
+ /// Gets the operation type.
+ /// </summary>
+ ClientOperationType Operation { get; }
+
+ /// <summary>
+ /// Gets the current iteration.
+ /// </summary>
+ int Iteration { get; }
+
+ /// <summary>
+ /// Gets the exception that caused current retry iteration.
+ /// </summary>
+ Exception Exception { get; }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
index 83651c5..fd6981b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
@@ -73,6 +73,7 @@ namespace Apache.Ignite
Logger = other.Logger;
SocketTimeout = other.SocketTimeout;
Endpoints = other.Endpoints.ToList();
+ RetryPolicy = other.RetryPolicy;
}
/// <summary>
@@ -97,5 +98,16 @@ namespace Apache.Ignite
/// * my-host.com:780..787 (custom port range).
/// </summary>
public IList<string> Endpoints { get; } = new List<string>();
+
+ /// <summary>
+ /// Gets or sets the retry policy. When a request fails due to a connection error,
+ /// Ignite will retry the request if the specified policy allows it.
+ /// <para />
+ /// Default is <see cref="RetryNonePolicy"/> - does not retry anything.
+ /// <para />
+ /// See also <see cref="RetryAllPolicy"/>, <see cref="RetryReadPolicy"/>, <see cref="RetryNonePolicy"/>,
+ /// <see cref="RetryLimitPolicy.RetryLimit"/>.
+ /// </summary>
+ public IRetryPolicy RetryPolicy { get; set; } = RetryNonePolicy.Instance;
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
index 76e38a6..a296321 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
@@ -40,8 +40,10 @@ namespace Apache.Ignite.Internal.Buffers
/// </summary>
internal sealed class PooledArrayBufferWriter : IBufferWriter<byte>, IDisposable
{
- /** Reserved prefix size. */
- private const int ReservedPrefixSize = 4 + 4 + 9; // Size (4 bytes) + OpCode (4 bytes) + RequestId (9 bytes)/
+ /// <summary>
+ /// Reserved prefix size.
+ /// </summary>
+ public const int ReservedPrefixSize = 4 + 5 + 9; // Size (4 bytes) + OpCode (5 bytes) + RequestId (9 bytes)/
/** Underlying pooled array. */
private byte[] _buffer;
@@ -49,14 +51,8 @@ namespace Apache.Ignite.Internal.Buffers
/** Index within the array. */
private int _index;
- /** Index within the array: backup for prefix writer mode. */
- private int _indexBackup;
-
- /** Start index within the array. */
- private int _startIndex;
-
/** Disposed flag. */
- private bool _disposed;
+ private volatile bool _disposed;
/// <summary>
/// Initializes a new instance of the <see cref="PooledArrayBufferWriter"/> class.
@@ -68,7 +64,6 @@ namespace Apache.Ignite.Internal.Buffers
// https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#buffering
_buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = ReservedPrefixSize;
- _startIndex = _index;
}
/// <summary>
@@ -77,29 +72,14 @@ namespace Apache.Ignite.Internal.Buffers
private int FreeCapacity => _buffer.Length - _index;
/// <summary>
- /// Gets the written memory.
+ /// Gets the written memory, including reserved prefix (see <see cref="ReservedPrefixSize"/>).
/// </summary>
/// <returns>Written array.</returns>
- public unsafe ReadOnlyMemory<byte> GetWrittenMemory()
+ public Memory<byte> GetWrittenMemory()
{
- if (_indexBackup > 0)
- {
- _index = _indexBackup;
- }
-
- // Write big-endian message size to the start of the buffer.
- const int sizeLen = 4;
- Debug.Assert(_startIndex >= sizeLen, "_startIndex >= 4");
-
- var messageSize = _index - _startIndex;
- var startIndex = _startIndex - sizeLen;
+ Debug.Assert(!_disposed, "!_disposed");
- fixed (byte* bufPtr = &_buffer[startIndex])
- {
- *(int*)bufPtr = IPAddress.HostToNetworkOrder(messageSize);
- }
-
- return new(_buffer, startIndex, _index - startIndex);
+ return new(_buffer, start: 0, length: _index);
}
/// <inheritdoc />
@@ -139,22 +119,6 @@ namespace Apache.Ignite.Internal.Buffers
public MessagePackWriter GetMessageWriter() => new(this);
/// <summary>
- /// Gets the <see cref="MessagePackWriter"/> for this buffer prefix.
- /// </summary>
- /// <param name="prefixSize">Prefix size.</param>
- /// <returns><see cref="MessagePackWriter"/> for this buffer.</returns>
- public MessagePackWriter GetPrefixWriter(int prefixSize)
- {
- Debug.Assert(prefixSize < _startIndex, "prefixSize < _startIndex");
-
- _indexBackup = _index;
- _startIndex -= prefixSize;
- _index = _startIndex;
-
- return new(this);
- }
-
- /// <summary>
/// Reserves space for an int32 value and returns its position.
/// </summary>
/// <returns>Reserved int position. To be used with <see cref="WriteInt32"/>.</returns>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index ec025a8..4cfc7a1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -98,9 +98,25 @@ namespace Apache.Ignite.Internal
/// <returns>Response data.</returns>
public async Task<PooledBuffer> DoOutInOpAsync(ClientOp clientOp, PooledArrayBufferWriter? request = null)
{
- var socket = await GetSocketAsync().ConfigureAwait(false);
+ var attempt = 0;
+ List<Exception>? errors = null;
- return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+ while (true)
+ {
+ try
+ {
+ var socket = await GetSocketAsync().ConfigureAwait(false);
+
+ return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+ }
+ catch (Exception e)
+ {
+ if (!HandleOpError(e, clientOp, ref attempt, ref errors))
+ {
+ throw;
+ }
+ }
+ }
}
/// <inheritdoc/>
@@ -252,5 +268,90 @@ namespace Apache.Ignite.Internal
throw;
}
}
+
+ /// <summary>
+ /// Gets a value indicating whether a failed operation should be retried.
+ /// </summary>
+ /// <param name="exception">Exception that caused the operation to fail.</param>
+ /// <param name="op">Operation code.</param>
+ /// <param name="attempt">Current attempt.</param>
+ /// <returns>
+ /// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise.
+ /// </returns>
+ private bool ShouldRetry(Exception exception, ClientOp op, int attempt)
+ {
+ var e = exception;
+
+ while (e != null && !(e is SocketException))
+ {
+ e = e.InnerException;
+ }
+
+ if (e == null)
+ {
+ // Only retry socket exceptions.
+ return false;
+ }
+
+ if (Configuration.RetryPolicy is RetryNonePolicy)
+ {
+ return false;
+ }
+
+ var publicOpType = op.ToPublicOperationType();
+
+ if (publicOpType == null)
+ {
+ // System operation.
+ return true;
+ }
+
+ var ctx = new RetryPolicyContext(new(Configuration), publicOpType.Value, attempt, exception);
+
+ return Configuration.RetryPolicy.ShouldRetry(ctx);
+ }
+
+ /// <summary>
+ /// Handles operation error.
+ /// </summary>
+ /// <param name="exception">Error.</param>
+ /// <param name="op">Operation code.</param>
+ /// <param name="attempt">Current attempt.</param>
+ /// <param name="errors">Previous errors.</param>
+ /// <returns>True if the error was handled, false otherwise.</returns>
+ private bool HandleOpError(
+ Exception exception,
+ ClientOp op,
+ ref int attempt,
+ ref List<Exception>? errors)
+ {
+ if (!ShouldRetry(exception, op, attempt))
+ {
+ if (errors == null)
+ {
+ return false;
+ }
+
+ errors.Add(exception);
+ var inner = new AggregateException(errors);
+
+ throw new IgniteClientException(
+ $"Operation failed after {attempt} retries, examine InnerException for details.",
+ inner);
+ }
+
+ if (errors == null)
+ {
+ errors = new List<Exception> { exception };
+ }
+ else
+ {
+ errors.Add(exception);
+ }
+
+ attempt++;
+
+ return true;
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 9d497e6..d362fe0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Internal
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
@@ -31,8 +32,6 @@ namespace Apache.Ignite.Internal
using MessagePack;
using Proto;
- using static Proto.MessagePackUtil;
-
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
@@ -71,9 +70,15 @@ namespace Apache.Ignite.Internal
/** Logger. */
private readonly IIgniteLogger? _logger;
+ /** Pre-allocated buffer for message size + op code + request id. To be used under <see cref="_sendLock"/>. */
+ private readonly byte[] _prefixBuffer = new byte[PooledArrayBufferWriter.ReservedPrefixSize];
+
/** Request id generator. */
private long _requestId;
+ /** Exception that caused this socket to close. */
+ private volatile Exception? _exception;
+
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket"/> class.
/// </summary>
@@ -141,6 +146,13 @@ namespace Apache.Ignite.Internal
/// <returns>Response data.</returns>
public Task<PooledBuffer> DoOutInOpAsync(ClientOp clientOp, PooledArrayBufferWriter? request = null)
{
+ var ex = _exception;
+
+ if (ex != null)
+ {
+ throw new IgniteClientException("Socket is closed due to an error, examine inner exception for details.", ex);
+ }
+
if (_disposeTokenSource.IsCancellationRequested)
{
throw new ObjectDisposedException(nameof(ClientSocket));
@@ -148,15 +160,11 @@ namespace Apache.Ignite.Internal
var requestId = Interlocked.Increment(ref _requestId);
- request ??= new PooledArrayBufferWriter(32);
-
- WritePrefix(request, clientOp, requestId);
-
var taskCompletionSource = new TaskCompletionSource<PooledBuffer>();
_requests[requestId] = taskCompletionSource;
- SendRequestAsync(request)
+ SendRequestAsync(request, clientOp, requestId)
.AsTask()
.ContinueWith(
(task, state) =>
@@ -183,8 +191,7 @@ namespace Apache.Ignite.Internal
/// <inheritdoc/>
public void Dispose()
{
- _disposeTokenSource.Cancel();
- _stream.Dispose();
+ Dispose(null);
}
/// <summary>
@@ -210,7 +217,7 @@ namespace Apache.Ignite.Internal
try
{
- await stream.ReadAsync(responseMagic.AsMemory(0, ProtoCommon.MagicBytes.Length)).ConfigureAwait(false);
+ await ReceiveBytesAsync(stream, responseMagic, ProtoCommon.MagicBytes.Length, CancellationToken.None).ConfigureAwait(false);
for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
{
@@ -272,7 +279,7 @@ namespace Apache.Ignite.Internal
try
{
- await stream.ReadAsync(bytes.AsMemory(0, size), cancellationToken).ConfigureAwait(false);
+ await ReceiveBytesAsync(stream, bytes, size, cancellationToken).ConfigureAwait(false);
return new PooledBuffer(bytes, 0, size);
}
@@ -292,11 +299,35 @@ namespace Apache.Ignite.Internal
const int messageSizeByteCount = 4;
Debug.Assert(buffer.Length >= messageSizeByteCount, "buffer.Length >= messageSizeByteCount");
- await stream.ReadAsync(buffer.AsMemory(0, messageSizeByteCount), cancellationToken).ConfigureAwait(false);
+ await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, cancellationToken).ConfigureAwait(false);
return GetMessageSize(buffer);
}
+ private static async Task ReceiveBytesAsync(
+ NetworkStream stream,
+ byte[] buffer,
+ int size,
+ CancellationToken cancellationToken)
+ {
+ int received = 0;
+
+ while (received < size)
+ {
+ var res = await stream.ReadAsync(buffer.AsMemory(received, size - received), cancellationToken).ConfigureAwait(false);
+
+ if (res == 0)
+ {
+ // Disconnected.
+ throw new IgniteClientException(
+ "Connection lost (failed to read data from socket)",
+ new SocketException((int) SocketError.ConnectionAborted));
+ }
+
+ received += res;
+ }
+ }
+
private static unsafe int GetMessageSize(byte[] responseLenBytes)
{
fixed (byte* len = &responseLenBytes[0])
@@ -312,7 +343,13 @@ namespace Apache.Ignite.Internal
using var bufferWriter = new PooledArrayBufferWriter();
WriteHandshake(version, bufferWriter.GetMessageWriter());
- await stream.WriteAsync(bufferWriter.GetWrittenMemory()).ConfigureAwait(false);
+ // Prepend size.
+ var buf = bufferWriter.GetWrittenMemory();
+ var size = buf.Length - PooledArrayBufferWriter.ReservedPrefixSize;
+ var resBuf = buf.Slice(PooledArrayBufferWriter.ReservedPrefixSize - 4);
+ WriteMessageSize(resBuf, size);
+
+ await stream.WriteAsync(resBuf).ConfigureAwait(false);
}
private static void WriteHandshake(ClientProtocolVersion version, MessagePackWriter w)
@@ -330,46 +367,79 @@ namespace Apache.Ignite.Internal
w.Flush();
}
- private static void WritePrefix(PooledArrayBufferWriter writer, ClientOp clientOp, long requestId)
+ private static unsafe void WriteMessageSize(Memory<byte> target, int size)
{
- var writeSize = GetWriteSize((ulong)clientOp) +
- GetWriteSize((ulong)requestId);
-
- var w = writer.GetPrefixWriter(writeSize);
-
- w.Write((int)clientOp);
- w.Write(requestId);
-
- w.Flush();
+ fixed (byte* bufPtr = target.Span)
+ {
+ *(int*)bufPtr = IPAddress.HostToNetworkOrder(size);
+ }
}
- private async ValueTask SendRequestAsync(PooledArrayBufferWriter request)
+ private async ValueTask SendRequestAsync(PooledArrayBufferWriter? request, ClientOp op, long requestId)
{
await _sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
try
{
- await _stream.WriteAsync(request.GetWrittenMemory(), _disposeTokenSource.Token).ConfigureAwait(false);
+ var prefixMem = _prefixBuffer.AsMemory()[4..];
+ var prefixSize = MessagePackUtil.WriteUnsigned(prefixMem, (int)op);
+ prefixSize += MessagePackUtil.WriteUnsigned(prefixMem[prefixSize..], requestId);
+
+ if (request != null)
+ {
+ var requestBuf = request.GetWrittenMemory();
+
+ WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - PooledArrayBufferWriter.ReservedPrefixSize);
+ var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
+
+ var requestBufStart = PooledArrayBufferWriter.ReservedPrefixSize - prefixBytes.Length;
+ var requestBufWithPrefix = requestBuf.Slice(requestBufStart);
+
+ // Copy prefix to request buf to avoid extra WriteAsync call for the prefix.
+ prefixBytes.CopyTo(requestBufWithPrefix);
+
+ await _stream.WriteAsync(requestBufWithPrefix, _disposeTokenSource.Token).ConfigureAwait(false);
+ }
+ else
+ {
+ // Request without body, send only the prefix.
+ WriteMessageSize(_prefixBuffer, prefixSize);
+ var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
+ await _stream.WriteAsync(prefixBytes, _disposeTokenSource.Token).ConfigureAwait(false);
+ }
}
finally
{
_sendLock.Release();
- request.Dispose(); // Release pooled buffer as soon as possible.
}
}
+ [SuppressMessage(
+ "Microsoft.Design",
+ "CA1031:DoNotCatchGeneralExceptionTypes",
+ Justification = "Any exception in receive loop should be handled.")]
private async Task RunReceiveLoop(CancellationToken cancellationToken)
{
// Reuse the same array for all responses.
var messageSizeBytes = new byte[4];
- while (!cancellationToken.IsCancellationRequested)
+ try
{
- PooledBuffer response = await ReadResponseAsync(_stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ PooledBuffer response = await ReadResponseAsync(_stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
+
+ // Invoke response handler in another thread to continue the receive loop.
+ // Response buffer should be disposed by the task handler.
+ ThreadPool.QueueUserWorkItem(r => HandleResponse((PooledBuffer)r), response);
+ }
+ }
+ catch (Exception e)
+ {
+ const string message = "Exception while reading from socket. Connection closed.";
- // Invoke response handler in another thread to continue the receive loop.
- // Response buffer should be disposed by the task handler.
- ThreadPool.QueueUserWorkItem(r => HandleResponse((PooledBuffer)r), response);
+ _logger?.Error(message, e);
+ Dispose(new IgniteClientException(message, e));
}
}
@@ -389,8 +459,9 @@ namespace Apache.Ignite.Internal
if (!_requests.TryRemove(requestId, out var taskCompletionSource))
{
- _logger?.Error($"Unexpected response ID ({requestId}) received from the server, closing the socket.");
- Dispose();
+ var message = $"Unexpected response ID ({requestId}) received from the server, closing the socket.";
+ _logger?.Error(message);
+ Dispose(new IgniteClientException(message));
return;
}
@@ -409,5 +480,34 @@ namespace Apache.Ignite.Internal
taskCompletionSource.SetResult(resultBuffer);
}
}
+
+ /// <summary>
+ /// Disposes this socket and completes active requests with the specified exception.
+ /// </summary>
+ /// <param name="ex">Exception that caused this socket to close. Null when socket is closed by the user.</param>
+ private void Dispose(Exception? ex)
+ {
+ if (_disposeTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+
+ _disposeTokenSource.Cancel();
+ _exception = ex;
+ _stream.Dispose();
+
+ ex ??= new ObjectDisposedException("Connection closed.");
+
+ while (!_requests.IsEmpty)
+ {
+ foreach (var reqId in _requests.Keys.ToArray())
+ {
+ if (_requests.TryRemove(reqId, out var req) && req != null)
+ {
+ req.TrySetException(ex);
+ }
+ }
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 9998af6..677ece1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -22,12 +22,6 @@ namespace Apache.Ignite.Internal.Proto
/// </summary>
internal enum ClientOp
{
- /** Create table. */
- TableCreate = 1,
-
- /** Drop table. */
- TableDrop = 2,
-
/** Get tables. */
TablesGet = 3,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
new file mode 100644
index 0000000..ba822e3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto
+{
+ using System;
+
+ /// <summary>
+ /// Extensions for <see cref="ClientOp"/>.
+ /// </summary>
+ internal static class ClientOpExtensions
+ {
+ /// <summary>
+ /// Converts the internal op code to a public operation type.
+ /// </summary>
+ /// <param name="op">Operation code.</param>
+ /// <returns>Operation type.</returns>
+ public static ClientOperationType? ToPublicOperationType(this ClientOp op)
+ {
+ return op switch
+ {
+ ClientOp.TablesGet => ClientOperationType.TablesGet,
+ ClientOp.TableGet => ClientOperationType.TableGet,
+ ClientOp.SchemasGet => null,
+ ClientOp.TupleUpsert => ClientOperationType.TupleUpsert,
+ ClientOp.TupleGet => ClientOperationType.TupleGet,
+ ClientOp.TupleUpsertAll => ClientOperationType.TupleUpsertAll,
+ ClientOp.TupleGetAll => ClientOperationType.TupleGetAll,
+ ClientOp.TupleGetAndUpsert => ClientOperationType.TupleGetAndUpsert,
+ ClientOp.TupleInsert => ClientOperationType.TupleInsert,
+ ClientOp.TupleInsertAll => ClientOperationType.TupleInsertAll,
+ ClientOp.TupleReplace => ClientOperationType.TupleReplace,
+ ClientOp.TupleReplaceExact => ClientOperationType.TupleReplaceExact,
+ ClientOp.TupleGetAndReplace => ClientOperationType.TupleGetAndReplace,
+ ClientOp.TupleDelete => ClientOperationType.TupleDelete,
+ ClientOp.TupleDeleteAll => ClientOperationType.TupleDeleteAll,
+ ClientOp.TupleDeleteExact => ClientOperationType.TupleDeleteExact,
+ ClientOp.TupleDeleteAllExact => ClientOperationType.TupleDeleteAllExact,
+ ClientOp.TupleGetAndDelete => ClientOperationType.TupleGetAndDelete,
+ ClientOp.TxBegin => null,
+ ClientOp.TxCommit => null,
+ ClientOp.TxRollback => null,
+ _ => throw new ArgumentOutOfRangeException(nameof(op), op, message: null)
+ };
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
index 19a08df..043d9a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Internal.Proto
{
+ using System;
using MessagePack;
/// <summary>
@@ -25,20 +26,63 @@ namespace Apache.Ignite.Internal.Proto
internal static class MessagePackUtil
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Writes an unsigned value to specified memory location and returns number of bytes written.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
+ /// <param name="mem">Memory.</param>
+ /// <param name="val">Value.</param>
+ /// <returns>Bytes written.</returns>
+ public static int WriteUnsigned(Memory<byte> mem, long val)
{
- return value switch
+ unchecked
{
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
+ var span = mem.Span;
+
+ if (val <= MessagePackRange.MaxFixPositiveInt)
+ {
+ span[0] = (byte)val;
+ return 1;
+ }
+
+ if (val <= byte.MaxValue)
+ {
+ span[0] = MessagePackCode.UInt8;
+ span[1] = (byte)val;
+
+ return 2;
+ }
+
+ if (val <= ushort.MaxValue)
+ {
+ span[0] = MessagePackCode.UInt16;
+ span[2] = (byte)val;
+ span[1] = (byte)(val >> 8);
+
+ return 3;
+ }
+
+ if (val <= uint.MaxValue)
+ {
+ span[0] = MessagePackCode.UInt32;
+ span[4] = (byte)val;
+ span[3] = (byte)(val >> 8);
+ span[2] = (byte)(val >> 16);
+ span[1] = (byte)(val >> 24);
+
+ return 5;
+ }
+
+ span[0] = MessagePackCode.UInt64;
+ span[8] = (byte)val;
+ span[7] = (byte)(val >> 8);
+ span[6] = (byte)(val >> 16);
+ span[5] = (byte)(val >> 24);
+ span[4] = (byte)(val >> 32);
+ span[3] = (byte)(val >> 40);
+ span[2] = (byte)(val >> 48);
+ span[1] = (byte)(val >> 56);
+
+ return 9;
+ }
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs
new file mode 100644
index 0000000..8987b57
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal
+{
+ using System;
+
+ /// <summary>
+ /// Retry policy context.
+ /// </summary>
+ internal class RetryPolicyContext : IRetryPolicyContext
+ {
+ /// <summary>
+ /// Initializes a new instance of the <see cref="RetryPolicyContext"/> class.
+ /// </summary>
+ /// <param name="configuration">Configuration.</param>
+ /// <param name="operation">Operation.</param>
+ /// <param name="iteration">Iteration.</param>
+ /// <param name="exception">Exception.</param>
+ public RetryPolicyContext(
+ IgniteClientConfiguration configuration,
+ ClientOperationType operation,
+ int iteration,
+ Exception exception)
+ {
+ Configuration = configuration;
+ Operation = operation;
+ Iteration = iteration;
+ Exception = exception;
+ }
+
+ /// <inheritdoc/>
+ public IgniteClientConfiguration Configuration { get; }
+
+ /// <inheritdoc/>
+ public ClientOperationType Operation { get; }
+
+ /// <inheritdoc/>
+ public int Iteration { get; }
+
+ /// <inheritdoc/>
+ public Exception Exception { get; }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 6b69f9e..5f1edea 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -280,9 +280,19 @@ namespace Apache.Ignite.Internal.Table
Transaction? tx,
PooledArrayBufferWriter? request = null)
{
- var socket = await _table.GetSocket(tx).ConfigureAwait(false);
+ if (tx == null)
+ {
+ // Use failover socket with reconnect and retry behavior.
+ return await _table.Socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+ }
+
+ if (tx.FailoverSocket != _table.Socket)
+ {
+ throw new IgniteClientException("Specified transaction belongs to a different IgniteClient instance.");
+ }
- return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+ // Use tx-specific socket without retry and failover.
+ return await tx.Socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
}
private async Task<PooledBuffer> DoRecordOutOpAsync(
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index f8d516f..943c2d4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -73,6 +73,11 @@ namespace Apache.Ignite.Internal.Table
/// <inheritdoc/>
public IRecordView<IIgniteTuple> RecordBinaryView { get; }
+ /// <summary>
+ /// Gets the associated socket.
+ /// </summary>
+ internal ClientFailoverSocket Socket => _socket;
+
/// <inheritdoc/>
public IRecordView<T> GetRecordView<T>()
where T : class
@@ -103,26 +108,6 @@ namespace Apache.Ignite.Internal.Table
}
/// <summary>
- /// Gets the socket.
- /// </summary>
- /// <param name="tx">Transaction.</param>
- /// <returns>Socket.</returns>
- internal ValueTask<ClientSocket> GetSocket(Transactions.Transaction? tx)
- {
- if (tx == null)
- {
- return _socket.GetSocketAsync();
- }
-
- if (tx.FailoverSocket != _socket)
- {
- throw new IgniteClientException("Specified transaction belongs to a different IgniteClient instance.");
- }
-
- return new ValueTask<ClientSocket>(tx.Socket);
- }
-
- /// <summary>
/// Reads the schema.
/// </summary>
/// <param name="buf">Buffer.</param>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
index 19a08df..88416f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
@@ -15,30 +15,16 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
{
- using MessagePack;
-
/// <summary>
- /// MessagePack utils.
+ /// Retry policy that always returns <c>true</c>.
/// </summary>
- internal static class MessagePackUtil
+ public sealed class RetryAllPolicy : RetryLimitPolicy
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Singleton instance.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
- {
- return value switch
- {
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
- }
+ public static readonly RetryAllPolicy Instance = new();
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
index 19a08df..d987228 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
@@ -15,30 +15,32 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
{
- using MessagePack;
+ using Internal.Common;
/// <summary>
- /// MessagePack utils.
+ /// Retry policy that returns <c>true</c> when <see cref="IRetryPolicyContext.Iteration"/> is less than
+ /// the specified <see cref="RetryLimit"/>.
/// </summary>
- internal static class MessagePackUtil
+ public class RetryLimitPolicy : IRetryPolicy
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Gets or sets the retry limit. 0 or less for no limit.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
+ public int RetryLimit { get; set; }
+
+ /// <inheritdoc />
+ public virtual bool ShouldRetry(IRetryPolicyContext context)
{
- return value switch
+ IgniteArgumentCheck.NotNull(context, nameof(context));
+
+ if (RetryLimit <= 0)
{
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
+ return true;
+ }
+
+ return context.Iteration < RetryLimit;
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
similarity index 60%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
index 19a08df..d335621 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
@@ -15,30 +15,22 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
{
- using MessagePack;
-
/// <summary>
- /// MessagePack utils.
+ /// Retry policy that always returns false.
/// </summary>
- internal static class MessagePackUtil
+ public sealed class RetryNonePolicy : IRetryPolicy
{
/// <summary>
- /// Gets the write size for the specified value.
+ /// Singleton instance.
/// </summary>
- /// <param name="value">Value.</param>
- /// <returns>MessagePack write size.</returns>
- public static int GetWriteSize(ulong value)
+ public static readonly RetryNonePolicy Instance = new();
+
+ /// <inheritdoc />
+ public bool ShouldRetry(IRetryPolicyContext context)
{
- return value switch
- {
- <= MessagePackRange.MaxFixPositiveInt => 1,
- <= byte.MaxValue => 2,
- <= ushort.MaxValue => 3,
- <= uint.MaxValue => 5,
- _ => 9
- };
+ return false;
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
new file mode 100644
index 0000000..fcf3fa5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite
+{
+ using System;
+ using Internal.Common;
+
+ /// <summary>
+ /// Retry policy that returns true for all read-only operations that do not modify data.
+ /// </summary>
+ public sealed class RetryReadPolicy : RetryLimitPolicy
+ {
+ /// <summary>
+ /// Singleton instance.
+ /// </summary>
+ public static readonly RetryReadPolicy Instance = new();
+
+ /// <inheritdoc />
+ public override bool ShouldRetry(IRetryPolicyContext context)
+ {
+ IgniteArgumentCheck.NotNull(context, nameof(context));
+
+ if (!base.ShouldRetry(context))
+ {
+ return false;
+ }
+
+ return context.Operation switch
+ {
+ ClientOperationType.TablesGet => true,
+ ClientOperationType.TableGet => true,
+ ClientOperationType.TupleUpsert => false,
+ ClientOperationType.TupleGet => true,
+ ClientOperationType.TupleUpsertAll => false,
+ ClientOperationType.TupleGetAll => true,
+ ClientOperationType.TupleGetAndUpsert => false,
+ ClientOperationType.TupleInsert => false,
+ ClientOperationType.TupleInsertAll => false,
+ ClientOperationType.TupleReplace => false,
+ ClientOperationType.TupleReplaceExact => false,
+ ClientOperationType.TupleGetAndReplace => false,
+ ClientOperationType.TupleDelete => false,
+ ClientOperationType.TupleDeleteAll => false,
+ ClientOperationType.TupleDeleteExact => false,
+ ClientOperationType.TupleDeleteAllExact => false,
+ ClientOperationType.TupleGetAndDelete => false,
+ var unsupported => throw new NotSupportedException("Unsupported operation type: " + unsupported)
+ };
+ }
+ }
+}