You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/15 06:35:59 UTC

[ignite-3] branch main updated: IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f8f4b2a  IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)
f8f4b2a is described below

commit f8f4b2a361419d43b91b8ee2ce4b5c85883f3aae
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue Mar 15 09:35:51 2022 +0300

    IGNITE-16222 .NET: Thin 3.0: Add RetryPolicy (#719)
    
    * Add `IClientRetryPolicy` interface.
    * Add predefined policies: `RetryAllPolicy`, `RetryReadPolicy`, `RetryNonePolicy`, `RetryLimitPolicy`.
    * Add `IgniteClientConfiguration.RetryPolicy` property, defaults to `RetryNonePolicy` (no retries by default).
    * Refactor `PooledArrayBufferWriter` to allow sending the same serialized request multiple times.
    * Fix table API to use `FailoverSocket` for non-transactional operations to leverage retry/reconnect.
    
    https://cwiki.apache.org/confluence/display/IGNITE/IEP-82+Thin+Client+Retry+Policy
---
 .../Buffers/PooledArrayBufferWriterTests.cs        |  66 +------
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       | 201 +++++++++++++++++++
 .../dotnet/Apache.Ignite.Tests/FakeServerTests.cs  |  75 +++++++
 .../Apache.Ignite.Tests/ProjectFilesTests.cs       |   1 +
 .../Proto/ClientOpExtensionsTest.cs}               |  30 ++-
 .../Proto/MessagePackExtensionsTest.cs             |   8 +-
 .../dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs | 218 +++++++++++++++++++++
 .../RetryReadPolicyTests.cs}                       |  32 ++-
 .../Serialization/ObjectSerializerHandlerTests.cs  |  12 +-
 .../dotnet/Apache.Ignite/ClientOperationType.cs    | 112 +++++++++++
 .../Proto/MessagePackUtil.cs => IRetryPolicy.cs}   |  30 ++-
 .../MessagePackUtil.cs => IRetryPolicyContext.cs}  |  39 ++--
 .../Apache.Ignite/IgniteClientConfiguration.cs     |  12 ++
 .../Internal/Buffers/PooledArrayBufferWriter.cs    |  54 +----
 .../Apache.Ignite/Internal/ClientFailoverSocket.cs | 105 +++++++++-
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  | 166 ++++++++++++----
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |   6 -
 .../Internal/Proto/ClientOpExtensions.cs           |  61 ++++++
 .../Internal/Proto/MessagePackUtil.cs              |  66 +++++--
 .../Apache.Ignite/Internal/RetryPolicyContext.cs   |  58 ++++++
 .../Apache.Ignite/Internal/Table/RecordView.cs     |  14 +-
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |  25 +--
 .../Proto/MessagePackUtil.cs => RetryAllPolicy.cs} |  24 +--
 .../MessagePackUtil.cs => RetryLimitPolicy.cs}     |  32 +--
 .../MessagePackUtil.cs => RetryNonePolicy.cs}      |  26 +--
 .../dotnet/Apache.Ignite/RetryReadPolicy.cs        |  66 +++++++
 26 files changed, 1227 insertions(+), 312 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
index 15cc80a..b281748 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
@@ -26,78 +26,18 @@ namespace Apache.Ignite.Tests.Buffers
     public class PooledArrayBufferWriterTests
     {
         [Test]
-        public void TestBufferWriterPrependsMessageLength()
+        public void TestBufferWriterReservesPrefixSpace()
         {
             using var bufferWriter = new PooledArrayBufferWriter();
-
-            // With payload.
             var writer = bufferWriter.GetMessageWriter();
 
             writer.Write(1);
             writer.Write("A");
             writer.Flush();
 
-            var res = bufferWriter.GetWrittenMemory().ToArray();
-
-            var expectedBytes = new byte[]
-            {
-                // 4 bytes BE length + 1 byte int + 1 byte fixstr + 1 byte char.
-                0, 0, 0, 3, 1, 0xa1, (byte)'A'
-            };
-
-            CollectionAssert.AreEqual(expectedBytes, res);
-        }
-
-        [Test]
-        public void TestBufferWriterPrependsPrefixAndMessageLength()
-        {
-            using var bufferWriter = new PooledArrayBufferWriter();
-
-            var writer = bufferWriter.GetMessageWriter();
-            writer.Write(1);
-            writer.Write("A");
-            writer.Flush();
-
-            var prefixWriter = bufferWriter.GetPrefixWriter(3);
-            prefixWriter.Write(7);
-            prefixWriter.Write(8);
-            prefixWriter.Write(9);
-            prefixWriter.Flush();
-
-            var res = bufferWriter.GetWrittenMemory().ToArray();
-
-            var expectedBytes = new byte[]
-            {
-                // 4 bytes BE length + 3 bytes prefix + 1 byte int + 1 byte fixstr + 1 byte char.
-                0, 0, 0, 6, 7, 8, 9, 1, 0xa1, (byte)'A'
-            };
-
-            CollectionAssert.AreEqual(expectedBytes, res);
-        }
-
-        [Test]
-        public void TestEmptyBufferWriterPrependsZeroMessageLength()
-        {
-            using var bufferWriter = new PooledArrayBufferWriter();
-
-            var res = bufferWriter.GetWrittenMemory().ToArray();
-
-            CollectionAssert.AreEqual(new byte[] { 0, 0, 0, 0 }, res);
-        }
-
-        [Test]
-        public void TestEmptyBufferWriterPrependsPrefix()
-        {
-            using var bufferWriter = new PooledArrayBufferWriter();
-
-            var writer = bufferWriter.GetPrefixWriter(2);
-            writer.Write(7);
-            writer.Write(8);
-            writer.Flush();
-
-            var res = bufferWriter.GetWrittenMemory().ToArray();
+            var res = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..].ToArray();
 
-            CollectionAssert.AreEqual(new byte[] { 0, 0, 0, 2, 7, 8 }, res);
+            CollectionAssert.AreEqual(new byte[] { 1, 0xa1, (byte)'A' }, res);
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
new file mode 100644
index 0000000..58ef00d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+    using System;
+    using System.Buffers;
+    using System.Net;
+    using System.Net.Sockets;
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Internal.Proto;
+    using MessagePack;
+
+    /// <summary>
+    /// Fake Ignite server for test purposes.
+    /// </summary>
+    public sealed class FakeServer : IDisposable
+    {
+        public const string Err = "Err!";
+
+        public const string ExistingTableName = "tbl1";
+
+        private readonly Socket _listener;
+
+        private readonly CancellationTokenSource _cts = new();
+
+        private readonly Func<int, bool> _shouldDropConnection;
+
+        public FakeServer(Func<int, bool>? shouldDropConnection = null)
+        {
+            _shouldDropConnection = shouldDropConnection ?? (_ => false);
+            _listener = new Socket(IPAddress.Loopback.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+            _listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
+            _listener.Listen(backlog: 1);
+
+            Task.Run(ListenLoop);
+        }
+
+        public async Task<IIgniteClient> ConnectClientAsync(IgniteClientConfiguration? cfg = null)
+        {
+            var port = ((IPEndPoint)_listener.LocalEndPoint).Port;
+
+            cfg ??= new IgniteClientConfiguration();
+
+            cfg.Endpoints.Clear();
+            cfg.Endpoints.Add("127.0.0.1:" + port);
+
+            return await IgniteClient.StartAsync(cfg);
+        }
+
+        public void Dispose()
+        {
+            _cts.Cancel();
+            _listener.Disconnect(true);
+            _listener.Dispose();
+            _cts.Dispose();
+        }
+
+        private static int ReceiveMessageSize(Socket handler) =>
+            IPAddress.NetworkToHostOrder(BitConverter.ToInt32(ReceiveBytes(handler, 4)));
+
+        private static byte[] ReceiveBytes(Socket socket, int size)
+        {
+            int received = 0;
+            var buf = new byte[size];
+
+            while (received < size)
+            {
+                var res = socket.Receive(buf, received, size - received, SocketFlags.None);
+
+                if (res == 0)
+                {
+                    throw new Exception("Connection lost");
+                }
+
+                received += res;
+            }
+
+            return buf;
+        }
+
+        private void ListenLoop()
+        {
+            int requestCount = 0;
+
+            while (!_cts.IsCancellationRequested)
+            {
+                using Socket handler = _listener.Accept();
+
+                // Read handshake.
+                ReceiveBytes(handler, 4); // Magic.
+                var msgSize = ReceiveMessageSize(handler);
+                ReceiveBytes(handler, msgSize);
+
+                // Write handshake response.
+                handler.Send(ProtoCommon.MagicBytes);
+                handler.Send(new byte[] { 0, 0, 0, 7 }); // Size.
+                handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128 });
+
+                while (!_cts.IsCancellationRequested)
+                {
+                    msgSize = ReceiveMessageSize(handler);
+                    var msg = ReceiveBytes(handler, msgSize);
+
+                    if (_shouldDropConnection(++requestCount))
+                    {
+                        break;
+                    }
+
+                    // Assume fixint8.
+                    var opCode = (ClientOp)msg[0];
+                    var requestId = msg[1];
+
+                    if (opCode == ClientOp.TablesGet)
+                    {
+                        handler.Send(new byte[] { 0, 0, 0, 4 }); // Size.
+                        handler.Send(new byte[] { 0, requestId, 0, 128 }); // Empty map.
+
+                        continue;
+                    }
+
+                    if (opCode == ClientOp.TableGet)
+                    {
+                        var reader = new MessagePackReader(msg.AsMemory()[2..]);
+                        var tableName = reader.ReadString();
+
+                        if (tableName == ExistingTableName)
+                        {
+                            handler.Send(new byte[] { 0, 0, 0, 21 }); // Size.
+                            handler.Send(new byte[] { 0, requestId, 0 });
+
+                            var arrayBufferWriter = new ArrayBufferWriter<byte>();
+                            var writer = new MessagePackWriter(arrayBufferWriter);
+                            writer.Write(Guid.Empty);
+                            writer.Flush();
+
+                            handler.Send(arrayBufferWriter.WrittenSpan);
+
+                            continue;
+                        }
+                    }
+
+                    if (opCode == ClientOp.SchemasGet)
+                    {
+                        handler.Send(new byte[] { 0, 0, 0, 6 }); // Size.
+                        handler.Send(new byte[] { 0, requestId, 0 });
+
+                        var arrayBufferWriter = new ArrayBufferWriter<byte>();
+                        var writer = new MessagePackWriter(arrayBufferWriter);
+                        writer.WriteMapHeader(1);
+                        writer.Write(1); // Version.
+                        writer.WriteArrayHeader(0); // Columns.
+                        writer.Flush();
+
+                        handler.Send(arrayBufferWriter.WrittenSpan);
+
+                        continue;
+                    }
+
+                    if (opCode == ClientOp.TupleUpsert)
+                    {
+                        handler.Send(new byte[] { 0, 0, 0, 3 }); // Size.
+                        handler.Send(new byte[] { 0, requestId, 0 }); // No payload.
+
+                        continue;
+                    }
+
+                    if (opCode == ClientOp.TxBegin)
+                    {
+                        handler.Send(new byte[] { 0, 0, 0, 4 }); // Size.
+                        handler.Send(new byte[] { 0, requestId, 0, 0 }); // Tx id.
+
+                        continue;
+                    }
+
+                    // Fake error message for any other op code.
+                    handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
+                    handler.Send(new byte[] { 0, requestId, 1, 160 | 4, (byte)Err[0], (byte)Err[1], (byte)Err[2], (byte)Err[3] });
+                }
+
+                handler.Disconnect(true);
+            }
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
new file mode 100644
index 0000000..22a5c64
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+    using System.Threading.Tasks;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests that <see cref="FakeServer"/> works as expected.
+    /// </summary>
+    public class FakeServerTests
+    {
+        [Test]
+        public async Task TestConnectToFakeServerAndGetTablesReturnsEmptyList()
+        {
+            using var server = new FakeServer();
+            using var client = await server.ConnectClientAsync();
+
+            var tables = await client.Tables.GetTablesAsync();
+            Assert.AreEqual(0, tables.Count);
+        }
+
+        [Test]
+        public async Task TestConnectToFakeServerAndGetTableThrowsError()
+        {
+            using var server = new FakeServer();
+            using var client = await server.ConnectClientAsync();
+
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTableAsync("t"));
+            Assert.AreEqual(FakeServer.Err, ex!.Message);
+        }
+
+        [Test]
+        public async Task TestConnectToFakeServerAndGetExistingTableReturnsTable()
+        {
+            using var server = new FakeServer();
+            using var client = await server.ConnectClientAsync();
+
+            var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+            Assert.IsNotNull(table);
+            Assert.AreEqual(FakeServer.ExistingTableName, table!.Name);
+        }
+
+        [Test]
+        public async Task TestFakeServerDropsConnectionOnSpecifiedRequestCount()
+        {
+            using var server = new FakeServer(reqId => reqId % 3 == 0);
+            using var client = await server.ConnectClientAsync();
+
+            // 2 requests succeed, 3rd fails.
+            await client.Tables.GetTablesAsync();
+            await client.Tables.GetTablesAsync();
+
+            Assert.CatchAsync(async () => await client.Tables.GetTablesAsync());
+
+            // Reconnect by FailoverSocket logic.
+            await client.Tables.GetTablesAsync();
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
index af20911..09208a3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
@@ -62,6 +62,7 @@ namespace Apache.Ignite.Tests
                 if (file.Contains(InternalDir, StringComparison.Ordinal) ||
                     file.Contains(".Tests", StringComparison.Ordinal) ||
                     file.Contains(".Benchmarks", StringComparison.Ordinal) ||
+                    file.EndsWith("RetryLimitPolicy.cs", StringComparison.Ordinal) ||
                     file.EndsWith("Exception.cs", StringComparison.Ordinal))
                 {
                     continue;
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
index 19a08df..50ccdba 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ClientOpExtensionsTest.cs
@@ -15,30 +15,26 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite.Tests.Proto
 {
-    using MessagePack;
+    using System.Linq;
+    using Internal.Proto;
+    using NUnit.Framework;
 
     /// <summary>
-    /// MessagePack utils.
+    /// Tests for <see cref="ClientOpExtensions"/>.
     /// </summary>
-    internal static class MessagePackUtil
+    public class ClientOpExtensionsTest
     {
-        /// <summary>
-        /// Gets the write size for the specified value.
-        /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
+        [Test]
+        public void TestToPublicOperationTypeSupportsAllOps()
         {
-            return value switch
+            var ops = typeof(ClientOp).GetEnumValues().Cast<ClientOp>().ToList();
+
+            foreach (var op in ops)
             {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
+                Assert.DoesNotThrow(() => op.ToPublicOperationType(), op.ToString());
+            }
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
index 0197e20..e97b056 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
@@ -118,7 +118,11 @@ namespace Apache.Ignite.Tests.Proto
             writer.Write(Guid.Parse(JavaUuidString));
             writer.Flush();
 
-            var bytes = bufferWriter.GetWrittenMemory()[4..].ToArray().Select(b => (sbyte) b).ToArray();
+            var bytes = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..]
+                .ToArray()
+                .Select(b => (sbyte) b)
+                .ToArray();
+
             CollectionAssert.AreEqual(JavaUuidBytes, bytes);
         }
 
@@ -154,7 +158,7 @@ namespace Apache.Ignite.Tests.Proto
             var bufferWriter = new PooledArrayBufferWriter();
             write(bufferWriter);
 
-            var mem = bufferWriter.GetWrittenMemory()[4..]; // Skip length.
+            var mem = bufferWriter.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize);
             return read(mem);
         }
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
new file mode 100644
index 0000000..d8c35d3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests
+{
+    using System.Collections.Generic;
+    using System.Threading.Tasks;
+    using Ignite.Table;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests client behavior with different <see cref="IgniteClientConfiguration.RetryPolicy"/> settings.
+    /// </summary>
+    public class RetryPolicyTests
+    {
+        private const int IterCount = 100;
+
+        [Test]
+        public async Task TestFailoverWithRetryPolicyCompletesOperationWithoutException()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new RetryAllPolicy { RetryLimit = 1 }
+            };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            for (int i = 0; i < IterCount; i++)
+            {
+                await client.Tables.GetTablesAsync();
+            }
+        }
+
+        [Test]
+        public async Task TestFailoverWithRetryPolicyDoesNotRetryUnrelatedErrors()
+        {
+            var cfg = new IgniteClientConfiguration { RetryPolicy = RetryAllPolicy.Instance };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTableAsync("bad-table"));
+            Assert.AreEqual(FakeServer.Err, ex!.Message);
+        }
+
+        [Test]
+        public async Task TestFailoverWithRetryPolicyThrowsOnRetryLimitExceeded()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new TestRetryPolicy { RetryLimit = 5 }
+            };
+
+            using var server = new FakeServer(reqId => reqId > 1);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            await client.Tables.GetTablesAsync();
+
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTablesAsync());
+            Assert.AreEqual("Operation failed after 5 retries, examine InnerException for details.", ex!.Message);
+        }
+
+        [Test]
+        public async Task TestZeroRetryLimitDoesNotLimitRetryCount()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new RetryAllPolicy { RetryLimit = 0 }
+            };
+
+            using var server = new FakeServer(reqId => reqId % 10 != 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            for (var i = 0; i < IterCount; i++)
+            {
+                await client.Tables.GetTablesAsync();
+            }
+        }
+
+        [Test]
+        public async Task TestRetryPolicyIsDisabledByDefault()
+        {
+            using var server = new FakeServer(reqId => reqId > 1);
+            using var client = await server.ConnectClientAsync();
+
+            await client.Tables.GetTablesAsync();
+
+            Assert.ThrowsAsync<IgniteClientException>(async () => await client.Tables.GetTablesAsync());
+        }
+
+        [Test]
+        public async Task TestCustomRetryPolicyIsInvokedWithCorrectContext()
+        {
+            var testRetryPolicy = new TestRetryPolicy { RetryLimit = 3 };
+
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = testRetryPolicy
+            };
+
+            using var server = new FakeServer(reqId => reqId % 3 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            for (var i = 0; i < IterCount; i++)
+            {
+                await client.Tables.GetTablesAsync();
+            }
+
+            Assert.AreEqual(49, testRetryPolicy.Invocations.Count);
+
+            var inv = testRetryPolicy.Invocations[0];
+
+            Assert.AreNotSame(cfg, inv.Configuration);
+            Assert.AreSame(testRetryPolicy, inv.Configuration.RetryPolicy);
+            Assert.AreEqual(ClientOperationType.TablesGet, inv.Operation);
+            Assert.AreEqual(0, inv.Iteration);
+        }
+
+        [Test]
+        public async Task TestTableOperationWithoutTxIsRetried()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
+            };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            for (int i = 0; i < IterCount; i++)
+            {
+                var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+                await table!.RecordBinaryView.UpsertAsync(null, new IgniteTuple());
+            }
+        }
+
+        [Test]
+        public async Task TestTableOperationWithTxIsNotRetried()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new TestRetryPolicy()
+            };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+            var tx = await client.Transactions.BeginAsync();
+
+            var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () => await table!.RecordBinaryView.UpsertAsync(tx, new IgniteTuple()));
+            StringAssert.StartsWith("Socket is closed due to an error", ex!.Message);
+        }
+
+        [Test]
+        public async Task TestRetryOperationWithPayloadReusesPooledBufferCorrectly()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
+            };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            for (int i = 0; i < IterCount; i++)
+            {
+                var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+                Assert.IsNotNull(table);
+            }
+        }
+
+        [Test]
+        public async Task TestRetryReadPolicyDoesNotRetryWriteOperations()
+        {
+            var cfg = new IgniteClientConfiguration
+            {
+                RetryPolicy = new RetryReadPolicy()
+            };
+
+            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var client = await server.ConnectClientAsync(cfg);
+
+            var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+            Assert.ThrowsAsync<IgniteClientException>(async () => await table!.RecordBinaryView.UpsertAsync(null, new IgniteTuple()));
+        }
+
+        private class TestRetryPolicy : RetryLimitPolicy
+        {
+            private readonly List<IRetryPolicyContext> _invocations = new();
+
+            public IReadOnlyList<IRetryPolicyContext> Invocations => _invocations;
+
+            public override bool ShouldRetry(IRetryPolicyContext context)
+            {
+                _invocations.Add(context);
+
+                return base.ShouldRetry(context);
+            }
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
index 19a08df..19e1656 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryReadPolicyTests.cs
@@ -15,30 +15,28 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite.Tests
 {
-    using MessagePack;
+    using System.Linq;
+    using Internal;
+    using NUnit.Framework;
 
     /// <summary>
-    /// MessagePack utils.
+    /// Tests for <see cref="RetryReadPolicy"/>.
     /// </summary>
-    internal static class MessagePackUtil
+    public class RetryReadPolicyTests
     {
-        /// <summary>
-        /// Gets the write size for the specified value.
-        /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
+        [Test]
+        public void TestRetryReadPolicySupportsAllOperations()
         {
-            return value switch
+            var opTypes = typeof(ClientOperationType).GetEnumValues().Cast<ClientOperationType>().ToList();
+
+            foreach (var opType in opTypes)
             {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
+                var ctx = new RetryPolicyContext(new(), opType, 1, new());
+
+                Assert.DoesNotThrow(() => RetryReadPolicy.Instance.ShouldRetry(ctx), opType.ToString());
+            }
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
index f18df9f..b880d17 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
@@ -54,9 +54,9 @@ namespace Apache.Ignite.Tests.Table.Serialization
         [Test]
         public void TestWriteUnsigned()
         {
-            var pooledWriter = Write(new UnsignedPoco(ulong.MaxValue, "foo"));
+            var bytes = Write(new UnsignedPoco(ulong.MaxValue, "foo"));
 
-            var resMem = pooledWriter.GetWrittenMemory()[4..]; // Skip length header.
+            var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
             var reader = new MessagePackReader(resMem);
 
             Assert.AreEqual(ulong.MaxValue, reader.ReadUInt64());
@@ -132,13 +132,13 @@ namespace Apache.Ignite.Tests.Table.Serialization
 
         private static MessagePackReader WriteAndGetReader(bool keyOnly = false)
         {
-            var pooledWriter = Write(new Poco { Key = 1234, Val = "foo" }, keyOnly);
+            var bytes = Write(new Poco { Key = 1234, Val = "foo" }, keyOnly);
 
-            var resMem = pooledWriter.GetWrittenMemory()[4..]; // Skip length header.
+            var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
             return new MessagePackReader(resMem);
         }
 
-        private static PooledArrayBufferWriter Write<T>(T obj, bool keyOnly = false)
+        private static byte[] Write<T>(T obj, bool keyOnly = false)
             where T : class
         {
             var handler = new ObjectSerializerHandler<T>();
@@ -148,7 +148,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
 
             handler.Write(ref writer, Schema, obj, keyOnly);
             writer.Flush();
-            return pooledWriter;
+            return pooledWriter.GetWrittenMemory().ToArray();
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
new file mode 100644
index 0000000..8ea75bb
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite
+{
+    using Table;
+
+    /// <summary>
+    /// Client operation type.
+    /// </summary>
+    public enum ClientOperationType
+    {
+        /// <summary>
+        /// Get tables (<see cref="ITables.GetTablesAsync"/>).
+        /// </summary>
+        TablesGet,
+
+        /// <summary>
+        /// Get table (<see cref="ITables.GetTableAsync"/>).
+        /// </summary>
+        TableGet,
+
+        /// <summary>
+        /// Upsert (<see cref="IRecordView{T}.UpsertAsync"/>).
+        /// </summary>
+        TupleUpsert,
+
+        /// <summary>
+        /// Get (<see cref="IRecordView{T}.GetAsync"/>).
+        /// </summary>
+        TupleGet,
+
+        /// <summary>
+        /// Upsert (<see cref="IRecordView{T}.UpsertAllAsync"/>).
+        /// </summary>
+        TupleUpsertAll,
+
+        /// <summary>
+        /// Get All (<see cref="IRecordView{T}.GetAllAsync"/>).
+        /// </summary>
+        TupleGetAll,
+
+        /// <summary>
+        /// Get and Upsert (<see cref="IRecordView{T}.GetAndUpsertAsync"/>).
+        /// </summary>
+        TupleGetAndUpsert,
+
+        /// <summary>
+        /// Insert (<see cref="IRecordView{T}.InsertAsync"/>).
+        /// </summary>
+        TupleInsert,
+
+        /// <summary>
+        /// Insert All (<see cref="IRecordView{T}.InsertAllAsync"/>).
+        /// </summary>
+        TupleInsertAll,
+
+        /// <summary>
+        /// Replace (<see cref="IRecordView{T}.ReplaceAsync(Apache.Ignite.Transactions.ITransaction?,T)"/>).
+        /// </summary>
+        TupleReplace,
+
+        /// <summary>
+        /// Replace Exact (<see cref="IRecordView{T}.ReplaceAsync(Apache.Ignite.Transactions.ITransaction?,T, T)"/>).
+        /// </summary>
+        TupleReplaceExact,
+
+        /// <summary>
+        /// Get and Replace (<see cref="IRecordView{T}.GetAndReplaceAsync"/>).
+        /// </summary>
+        TupleGetAndReplace,
+
+        /// <summary>
+        /// Delete (<see cref="IRecordView{T}.DeleteAsync"/>).
+        /// </summary>
+        TupleDelete,
+
+        /// <summary>
+        /// Delete All (<see cref="IRecordView{T}.DeleteAllAsync"/>).
+        /// </summary>
+        TupleDeleteAll,
+
+        /// <summary>
+        /// Delete Exact (<see cref="IRecordView{T}.DeleteExactAsync"/>).
+        /// </summary>
+        TupleDeleteExact,
+
+        /// <summary>
+        /// Delete All Exact (<see cref="IRecordView{T}.DeleteAllExactAsync"/>).
+        /// </summary>
+        TupleDeleteAllExact,
+
+        /// <summary>
+        /// Get and Delete (<see cref="IRecordView{T}.GetAndDeleteAsync"/>).
+        /// </summary>
+        TupleGetAndDelete
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
index 19a08df..c814909 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicy.cs
@@ -15,30 +15,22 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
 {
-    using MessagePack;
-
     /// <summary>
-    /// MessagePack utils.
+    /// Client retry policy determines whether client operations that have failed due to a connection issue
+    /// should be retried.
     /// </summary>
-    internal static class MessagePackUtil
+    public interface IRetryPolicy
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Gets a value indicating whether a client operation that has failed due to a connection issue
+        /// should be retried.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
-        {
-            return value switch
-            {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
-        }
+        /// <param name="context">Operation context.</param>
+        /// <returns>
+        /// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise.
+        /// </returns>
+        bool ShouldRetry(IRetryPolicyContext context);
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
similarity index 57%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
index 19a08df..c7e9706 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IRetryPolicyContext.cs
@@ -15,30 +15,33 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
 {
-    using MessagePack;
+    using System;
 
     /// <summary>
-    /// MessagePack utils.
+    /// Retry policy context. See <see cref="IRetryPolicy.ShouldRetry"/>.
     /// </summary>
-    internal static class MessagePackUtil
+    public interface IRetryPolicyContext
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Gets the client configuration.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
-        {
-            return value switch
-            {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
-        }
+        IgniteClientConfiguration Configuration { get; }
+
+        /// <summary>
+        /// Gets the operation type.
+        /// </summary>
+        ClientOperationType Operation { get; }
+
+        /// <summary>
+        /// Gets the current iteration.
+        /// </summary>
+        int Iteration { get; }
+
+        /// <summary>
+        /// Gets the exception that caused current retry iteration.
+        /// </summary>
+        Exception Exception { get; }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
index 83651c5..fd6981b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientConfiguration.cs
@@ -73,6 +73,7 @@ namespace Apache.Ignite
             Logger = other.Logger;
             SocketTimeout = other.SocketTimeout;
             Endpoints = other.Endpoints.ToList();
+            RetryPolicy = other.RetryPolicy;
         }
 
         /// <summary>
@@ -97,5 +98,16 @@ namespace Apache.Ignite
         ///  * my-host.com:780..787 (custom port range).
         /// </summary>
         public IList<string> Endpoints { get; } = new List<string>();
+
+        /// <summary>
+        /// Gets or sets the retry policy. When a request fails due to a connection error,
+        /// Ignite will retry the request if the specified policy allows it.
+        /// <para />
+        /// Default is <see cref="RetryNonePolicy"/> - does not retry anything.
+        /// <para />
+        /// See also <see cref="RetryAllPolicy"/>, <see cref="RetryReadPolicy"/>, <see cref="RetryNonePolicy"/>,
+        /// <see cref="RetryLimitPolicy.RetryLimit"/>.
+        /// </summary>
+        public IRetryPolicy RetryPolicy { get; set; } = RetryNonePolicy.Instance;
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
index 76e38a6..a296321 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
@@ -40,8 +40,10 @@ namespace Apache.Ignite.Internal.Buffers
     /// </summary>
     internal sealed class PooledArrayBufferWriter : IBufferWriter<byte>, IDisposable
     {
-        /** Reserved prefix size. */
-        private const int ReservedPrefixSize = 4 + 4 + 9; // Size (4 bytes) + OpCode (4 bytes) + RequestId (9 bytes)/
+        /// <summary>
+        /// Reserved prefix size.
+        /// </summary>
+        public const int ReservedPrefixSize = 4 + 5 + 9; // Size (4 bytes) + OpCode (5 bytes) + RequestId (9 bytes)/
 
         /** Underlying pooled array. */
         private byte[] _buffer;
@@ -49,14 +51,8 @@ namespace Apache.Ignite.Internal.Buffers
         /** Index within the array. */
         private int _index;
 
-        /** Index within the array: backup for prefix writer mode. */
-        private int _indexBackup;
-
-        /** Start index within the array. */
-        private int _startIndex;
-
         /** Disposed flag. */
-        private bool _disposed;
+        private volatile bool _disposed;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="PooledArrayBufferWriter"/> class.
@@ -68,7 +64,6 @@ namespace Apache.Ignite.Internal.Buffers
             // https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#buffering
             _buffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
             _index = ReservedPrefixSize;
-            _startIndex = _index;
         }
 
         /// <summary>
@@ -77,29 +72,14 @@ namespace Apache.Ignite.Internal.Buffers
         private int FreeCapacity => _buffer.Length - _index;
 
         /// <summary>
-        /// Gets the written memory.
+        /// Gets the written memory, including reserved prefix (see <see cref="ReservedPrefixSize"/>).
         /// </summary>
         /// <returns>Written array.</returns>
-        public unsafe ReadOnlyMemory<byte> GetWrittenMemory()
+        public Memory<byte> GetWrittenMemory()
         {
-            if (_indexBackup > 0)
-            {
-                _index = _indexBackup;
-            }
-
-            // Write big-endian message size to the start of the buffer.
-            const int sizeLen = 4;
-            Debug.Assert(_startIndex >= sizeLen, "_startIndex >= 4");
-
-            var messageSize = _index - _startIndex;
-            var startIndex = _startIndex - sizeLen;
+            Debug.Assert(!_disposed, "!_disposed");
 
-            fixed (byte* bufPtr = &_buffer[startIndex])
-            {
-                *(int*)bufPtr = IPAddress.HostToNetworkOrder(messageSize);
-            }
-
-            return new(_buffer, startIndex, _index - startIndex);
+            return new(_buffer, start: 0, length: _index);
         }
 
         /// <inheritdoc />
@@ -139,22 +119,6 @@ namespace Apache.Ignite.Internal.Buffers
         public MessagePackWriter GetMessageWriter() => new(this);
 
         /// <summary>
-        /// Gets the <see cref="MessagePackWriter"/> for this buffer prefix.
-        /// </summary>
-        /// <param name="prefixSize">Prefix size.</param>
-        /// <returns><see cref="MessagePackWriter"/> for this buffer.</returns>
-        public MessagePackWriter GetPrefixWriter(int prefixSize)
-        {
-            Debug.Assert(prefixSize < _startIndex, "prefixSize < _startIndex");
-
-            _indexBackup = _index;
-            _startIndex -= prefixSize;
-            _index = _startIndex;
-
-            return new(this);
-        }
-
-        /// <summary>
         /// Reserves space for an int32 value and returns its position.
         /// </summary>
         /// <returns>Reserved int position. To be used with <see cref="WriteInt32"/>.</returns>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
index ec025a8..4cfc7a1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs
@@ -98,9 +98,25 @@ namespace Apache.Ignite.Internal
         /// <returns>Response data.</returns>
         public async Task<PooledBuffer> DoOutInOpAsync(ClientOp clientOp, PooledArrayBufferWriter? request = null)
         {
-            var socket = await GetSocketAsync().ConfigureAwait(false);
+            var attempt = 0;
+            List<Exception>? errors = null;
 
-            return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+            while (true)
+            {
+                try
+                {
+                    var socket = await GetSocketAsync().ConfigureAwait(false);
+
+                    return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+                }
+                catch (Exception e)
+                {
+                    if (!HandleOpError(e, clientOp, ref attempt, ref errors))
+                    {
+                        throw;
+                    }
+                }
+            }
         }
 
         /// <inheritdoc/>
@@ -252,5 +268,90 @@ namespace Apache.Ignite.Internal
                 throw;
             }
         }
+
+        /// <summary>
+        /// Gets a value indicating whether a failed operation should be retried.
+        /// </summary>
+        /// <param name="exception">Exception that caused the operation to fail.</param>
+        /// <param name="op">Operation code.</param>
+        /// <param name="attempt">Current attempt.</param>
+        /// <returns>
+        /// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise.
+        /// </returns>
+        private bool ShouldRetry(Exception exception, ClientOp op, int attempt)
+        {
+            var e = exception;
+
+            while (e != null && !(e is SocketException))
+            {
+                e = e.InnerException;
+            }
+
+            if (e == null)
+            {
+                // Only retry socket exceptions.
+                return false;
+            }
+
+            if (Configuration.RetryPolicy is RetryNonePolicy)
+            {
+                return false;
+            }
+
+            var publicOpType = op.ToPublicOperationType();
+
+            if (publicOpType == null)
+            {
+                // System operation.
+                return true;
+            }
+
+            var ctx = new RetryPolicyContext(new(Configuration), publicOpType.Value, attempt, exception);
+
+            return Configuration.RetryPolicy.ShouldRetry(ctx);
+        }
+
+        /// <summary>
+        /// Handles operation error.
+        /// </summary>
+        /// <param name="exception">Error.</param>
+        /// <param name="op">Operation code.</param>
+        /// <param name="attempt">Current attempt.</param>
+        /// <param name="errors">Previous errors.</param>
+        /// <returns>True if the error was handled, false otherwise.</returns>
+        private bool HandleOpError(
+            Exception exception,
+            ClientOp op,
+            ref int attempt,
+            ref List<Exception>? errors)
+        {
+            if (!ShouldRetry(exception, op, attempt))
+            {
+                if (errors == null)
+                {
+                    return false;
+                }
+
+                errors.Add(exception);
+                var inner = new AggregateException(errors);
+
+                throw new IgniteClientException(
+                    $"Operation failed after {attempt} retries, examine InnerException for details.",
+                    inner);
+            }
+
+            if (errors == null)
+            {
+                errors = new List<Exception> { exception };
+            }
+            else
+            {
+                errors.Add(exception);
+            }
+
+            attempt++;
+
+            return true;
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 9d497e6..d362fe0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -22,6 +22,7 @@ namespace Apache.Ignite.Internal
     using System.Collections.Concurrent;
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
     using System.Net;
     using System.Net.Sockets;
     using System.Threading;
@@ -31,8 +32,6 @@ namespace Apache.Ignite.Internal
     using MessagePack;
     using Proto;
 
-    using static Proto.MessagePackUtil;
-
     /// <summary>
     /// Wrapper over framework socket for Ignite thin client operations.
     /// </summary>
@@ -71,9 +70,15 @@ namespace Apache.Ignite.Internal
         /** Logger. */
         private readonly IIgniteLogger? _logger;
 
+        /** Pre-allocated buffer for message size + op code + request id. To be used under <see cref="_sendLock"/>. */
+        private readonly byte[] _prefixBuffer = new byte[PooledArrayBufferWriter.ReservedPrefixSize];
+
         /** Request id generator. */
         private long _requestId;
 
+        /** Exception that caused this socket to close. */
+        private volatile Exception? _exception;
+
         /// <summary>
         /// Initializes a new instance of the <see cref="ClientSocket"/> class.
         /// </summary>
@@ -141,6 +146,13 @@ namespace Apache.Ignite.Internal
         /// <returns>Response data.</returns>
         public Task<PooledBuffer> DoOutInOpAsync(ClientOp clientOp, PooledArrayBufferWriter? request = null)
         {
+            var ex = _exception;
+
+            if (ex != null)
+            {
+                throw new IgniteClientException("Socket is closed due to an error, examine inner exception for details.", ex);
+            }
+
             if (_disposeTokenSource.IsCancellationRequested)
             {
                 throw new ObjectDisposedException(nameof(ClientSocket));
@@ -148,15 +160,11 @@ namespace Apache.Ignite.Internal
 
             var requestId = Interlocked.Increment(ref _requestId);
 
-            request ??= new PooledArrayBufferWriter(32);
-
-            WritePrefix(request, clientOp, requestId);
-
             var taskCompletionSource = new TaskCompletionSource<PooledBuffer>();
 
             _requests[requestId] = taskCompletionSource;
 
-            SendRequestAsync(request)
+            SendRequestAsync(request, clientOp, requestId)
                 .AsTask()
                 .ContinueWith(
                     (task, state) =>
@@ -183,8 +191,7 @@ namespace Apache.Ignite.Internal
         /// <inheritdoc/>
         public void Dispose()
         {
-            _disposeTokenSource.Cancel();
-            _stream.Dispose();
+            Dispose(null);
         }
 
         /// <summary>
@@ -210,7 +217,7 @@ namespace Apache.Ignite.Internal
 
             try
             {
-                await stream.ReadAsync(responseMagic.AsMemory(0, ProtoCommon.MagicBytes.Length)).ConfigureAwait(false);
+                await ReceiveBytesAsync(stream, responseMagic, ProtoCommon.MagicBytes.Length, CancellationToken.None).ConfigureAwait(false);
 
                 for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
                 {
@@ -272,7 +279,7 @@ namespace Apache.Ignite.Internal
 
             try
             {
-                await stream.ReadAsync(bytes.AsMemory(0, size), cancellationToken).ConfigureAwait(false);
+                await ReceiveBytesAsync(stream, bytes, size, cancellationToken).ConfigureAwait(false);
 
                 return new PooledBuffer(bytes, 0, size);
             }
@@ -292,11 +299,35 @@ namespace Apache.Ignite.Internal
             const int messageSizeByteCount = 4;
             Debug.Assert(buffer.Length >= messageSizeByteCount, "buffer.Length >= messageSizeByteCount");
 
-            await stream.ReadAsync(buffer.AsMemory(0, messageSizeByteCount), cancellationToken).ConfigureAwait(false);
+            await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, cancellationToken).ConfigureAwait(false);
 
             return GetMessageSize(buffer);
         }
 
+        private static async Task ReceiveBytesAsync(
+            NetworkStream stream,
+            byte[] buffer,
+            int size,
+            CancellationToken cancellationToken)
+        {
+            int received = 0;
+
+            while (received < size)
+            {
+                var res = await stream.ReadAsync(buffer.AsMemory(received, size - received), cancellationToken).ConfigureAwait(false);
+
+                if (res == 0)
+                {
+                    // Disconnected.
+                    throw new IgniteClientException(
+                        "Connection lost (failed to read data from socket)",
+                        new SocketException((int) SocketError.ConnectionAborted));
+                }
+
+                received += res;
+            }
+        }
+
         private static unsafe int GetMessageSize(byte[] responseLenBytes)
         {
             fixed (byte* len = &responseLenBytes[0])
@@ -312,7 +343,13 @@ namespace Apache.Ignite.Internal
             using var bufferWriter = new PooledArrayBufferWriter();
             WriteHandshake(version, bufferWriter.GetMessageWriter());
 
-            await stream.WriteAsync(bufferWriter.GetWrittenMemory()).ConfigureAwait(false);
+            // Prepend size.
+            var buf = bufferWriter.GetWrittenMemory();
+            var size = buf.Length - PooledArrayBufferWriter.ReservedPrefixSize;
+            var resBuf = buf.Slice(PooledArrayBufferWriter.ReservedPrefixSize - 4);
+            WriteMessageSize(resBuf, size);
+
+            await stream.WriteAsync(resBuf).ConfigureAwait(false);
         }
 
         private static void WriteHandshake(ClientProtocolVersion version, MessagePackWriter w)
@@ -330,46 +367,79 @@ namespace Apache.Ignite.Internal
             w.Flush();
         }
 
-        private static void WritePrefix(PooledArrayBufferWriter writer, ClientOp clientOp, long requestId)
+        private static unsafe void WriteMessageSize(Memory<byte> target, int size)
         {
-            var writeSize = GetWriteSize((ulong)clientOp) +
-                            GetWriteSize((ulong)requestId);
-
-            var w = writer.GetPrefixWriter(writeSize);
-
-            w.Write((int)clientOp);
-            w.Write(requestId);
-
-            w.Flush();
+            fixed (byte* bufPtr = target.Span)
+            {
+                *(int*)bufPtr = IPAddress.HostToNetworkOrder(size);
+            }
         }
 
-        private async ValueTask SendRequestAsync(PooledArrayBufferWriter request)
+        private async ValueTask SendRequestAsync(PooledArrayBufferWriter? request, ClientOp op, long requestId)
         {
             await _sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
 
             try
             {
-                await _stream.WriteAsync(request.GetWrittenMemory(), _disposeTokenSource.Token).ConfigureAwait(false);
+                var prefixMem = _prefixBuffer.AsMemory()[4..];
+                var prefixSize = MessagePackUtil.WriteUnsigned(prefixMem, (int)op);
+                prefixSize += MessagePackUtil.WriteUnsigned(prefixMem[prefixSize..], requestId);
+
+                if (request != null)
+                {
+                    var requestBuf = request.GetWrittenMemory();
+
+                    WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - PooledArrayBufferWriter.ReservedPrefixSize);
+                    var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
+
+                    var requestBufStart = PooledArrayBufferWriter.ReservedPrefixSize - prefixBytes.Length;
+                    var requestBufWithPrefix = requestBuf.Slice(requestBufStart);
+
+                    // Copy prefix to request buf to avoid extra WriteAsync call for the prefix.
+                    prefixBytes.CopyTo(requestBufWithPrefix);
+
+                    await _stream.WriteAsync(requestBufWithPrefix, _disposeTokenSource.Token).ConfigureAwait(false);
+                }
+                else
+                {
+                    // Request without body, send only the prefix.
+                    WriteMessageSize(_prefixBuffer, prefixSize);
+                    var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
+                    await _stream.WriteAsync(prefixBytes, _disposeTokenSource.Token).ConfigureAwait(false);
+                }
             }
             finally
             {
                 _sendLock.Release();
-                request.Dispose(); // Release pooled buffer as soon as possible.
             }
         }
 
+        [SuppressMessage(
+            "Microsoft.Design",
+            "CA1031:DoNotCatchGeneralExceptionTypes",
+            Justification = "Any exception in receive loop should be handled.")]
         private async Task RunReceiveLoop(CancellationToken cancellationToken)
         {
             // Reuse the same array for all responses.
             var messageSizeBytes = new byte[4];
 
-            while (!cancellationToken.IsCancellationRequested)
+            try
             {
-                PooledBuffer response = await ReadResponseAsync(_stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
+                while (!cancellationToken.IsCancellationRequested)
+                {
+                    PooledBuffer response = await ReadResponseAsync(_stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
+
+                    // Invoke response handler in another thread to continue the receive loop.
+                    // Response buffer should be disposed by the task handler.
+                    ThreadPool.QueueUserWorkItem(r => HandleResponse((PooledBuffer)r), response);
+                }
+            }
+            catch (Exception e)
+            {
+                const string message = "Exception while reading from socket. Connection closed.";
 
-                // Invoke response handler in another thread to continue the receive loop.
-                // Response buffer should be disposed by the task handler.
-                ThreadPool.QueueUserWorkItem(r => HandleResponse((PooledBuffer)r), response);
+                _logger?.Error(message, e);
+                Dispose(new IgniteClientException(message, e));
             }
         }
 
@@ -389,8 +459,9 @@ namespace Apache.Ignite.Internal
 
             if (!_requests.TryRemove(requestId, out var taskCompletionSource))
             {
-                _logger?.Error($"Unexpected response ID ({requestId}) received from the server, closing the socket.");
-                Dispose();
+                var message = $"Unexpected response ID ({requestId}) received from the server, closing the socket.";
+                _logger?.Error(message);
+                Dispose(new IgniteClientException(message));
 
                 return;
             }
@@ -409,5 +480,34 @@ namespace Apache.Ignite.Internal
                 taskCompletionSource.SetResult(resultBuffer);
             }
         }
+
+        /// <summary>
+        /// Disposes this socket and completes active requests with the specified exception.
+        /// </summary>
+        /// <param name="ex">Exception that caused this socket to close. Null when socket is closed by the user.</param>
+        private void Dispose(Exception? ex)
+        {
+            if (_disposeTokenSource.IsCancellationRequested)
+            {
+                return;
+            }
+
+            _disposeTokenSource.Cancel();
+            _exception = ex;
+            _stream.Dispose();
+
+            ex ??= new ObjectDisposedException("Connection closed.");
+
+            while (!_requests.IsEmpty)
+            {
+                foreach (var reqId in _requests.Keys.ToArray())
+                {
+                    if (_requests.TryRemove(reqId, out var req) && req != null)
+                    {
+                        req.TrySetException(ex);
+                    }
+                }
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 9998af6..677ece1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -22,12 +22,6 @@ namespace Apache.Ignite.Internal.Proto
     /// </summary>
     internal enum ClientOp
     {
-        /** Create table. */
-        TableCreate = 1,
-
-        /** Drop table. */
-        TableDrop = 2,
-
         /** Get tables. */
         TablesGet = 3,
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
new file mode 100644
index 0000000..ba822e3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto
+{
+    using System;
+
+    /// <summary>
+    /// Extensions for <see cref="ClientOp"/>.
+    /// </summary>
+    internal static class ClientOpExtensions
+    {
+        /// <summary>
+        /// Converts the internal op code to a public operation type.
+        /// </summary>
+        /// <param name="op">Operation code.</param>
+        /// <returns>Operation type.</returns>
+        public static ClientOperationType? ToPublicOperationType(this ClientOp op)
+        {
+            return op switch
+            {
+                ClientOp.TablesGet => ClientOperationType.TablesGet,
+                ClientOp.TableGet => ClientOperationType.TableGet,
+                ClientOp.SchemasGet => null,
+                ClientOp.TupleUpsert => ClientOperationType.TupleUpsert,
+                ClientOp.TupleGet => ClientOperationType.TupleGet,
+                ClientOp.TupleUpsertAll => ClientOperationType.TupleUpsertAll,
+                ClientOp.TupleGetAll => ClientOperationType.TupleGetAll,
+                ClientOp.TupleGetAndUpsert => ClientOperationType.TupleGetAndUpsert,
+                ClientOp.TupleInsert => ClientOperationType.TupleInsert,
+                ClientOp.TupleInsertAll => ClientOperationType.TupleInsertAll,
+                ClientOp.TupleReplace => ClientOperationType.TupleReplace,
+                ClientOp.TupleReplaceExact => ClientOperationType.TupleReplaceExact,
+                ClientOp.TupleGetAndReplace => ClientOperationType.TupleGetAndReplace,
+                ClientOp.TupleDelete => ClientOperationType.TupleDelete,
+                ClientOp.TupleDeleteAll => ClientOperationType.TupleDeleteAll,
+                ClientOp.TupleDeleteExact => ClientOperationType.TupleDeleteExact,
+                ClientOp.TupleDeleteAllExact => ClientOperationType.TupleDeleteAllExact,
+                ClientOp.TupleGetAndDelete => ClientOperationType.TupleGetAndDelete,
+                ClientOp.TxBegin => null,
+                ClientOp.TxCommit => null,
+                ClientOp.TxRollback => null,
+                _ => throw new ArgumentOutOfRangeException(nameof(op), op, message: null)
+            };
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
index 19a08df..043d9a8 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Internal.Proto
 {
+    using System;
     using MessagePack;
 
     /// <summary>
@@ -25,20 +26,63 @@ namespace Apache.Ignite.Internal.Proto
     internal static class MessagePackUtil
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Writes an unsigned value to specified memory location and returns number of bytes written.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
+        /// <param name="mem">Memory.</param>
+        /// <param name="val">Value.</param>
+        /// <returns>Bytes written.</returns>
+        public static int WriteUnsigned(Memory<byte> mem, long val)
         {
-            return value switch
+            unchecked
             {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
+                var span = mem.Span;
+
+                if (val <= MessagePackRange.MaxFixPositiveInt)
+                {
+                    span[0] = (byte)val;
+                    return 1;
+                }
+
+                if (val <= byte.MaxValue)
+                {
+                    span[0] = MessagePackCode.UInt8;
+                    span[1] = (byte)val;
+
+                    return 2;
+                }
+
+                if (val <= ushort.MaxValue)
+                {
+                    span[0] = MessagePackCode.UInt16;
+                    span[2] = (byte)val;
+                    span[1] = (byte)(val >> 8);
+
+                    return 3;
+                }
+
+                if (val <= uint.MaxValue)
+                {
+                    span[0] = MessagePackCode.UInt32;
+                    span[4] = (byte)val;
+                    span[3] = (byte)(val >> 8);
+                    span[2] = (byte)(val >> 16);
+                    span[1] = (byte)(val >> 24);
+
+                    return 5;
+                }
+
+                span[0] = MessagePackCode.UInt64;
+                span[8] = (byte)val;
+                span[7] = (byte)(val >> 8);
+                span[6] = (byte)(val >> 16);
+                span[5] = (byte)(val >> 24);
+                span[4] = (byte)(val >> 32);
+                span[3] = (byte)(val >> 40);
+                span[2] = (byte)(val >> 48);
+                span[1] = (byte)(val >> 56);
+
+                return 9;
+            }
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs
new file mode 100644
index 0000000..8987b57
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/RetryPolicyContext.cs
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal
+{
+    using System;
+
+    /// <summary>
+    /// Retry policy context.
+    /// </summary>
+    internal class RetryPolicyContext : IRetryPolicyContext
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="RetryPolicyContext"/> class.
+        /// </summary>
+        /// <param name="configuration">Configuration.</param>
+        /// <param name="operation">Operation.</param>
+        /// <param name="iteration">Iteration.</param>
+        /// <param name="exception">Exception.</param>
+        public RetryPolicyContext(
+            IgniteClientConfiguration configuration,
+            ClientOperationType operation,
+            int iteration,
+            Exception exception)
+        {
+            Configuration = configuration;
+            Operation = operation;
+            Iteration = iteration;
+            Exception = exception;
+        }
+
+        /// <inheritdoc/>
+        public IgniteClientConfiguration Configuration { get; }
+
+        /// <inheritdoc/>
+        public ClientOperationType Operation { get; }
+
+        /// <inheritdoc/>
+        public int Iteration { get; }
+
+        /// <inheritdoc/>
+        public Exception Exception { get; }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 6b69f9e..5f1edea 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -280,9 +280,19 @@ namespace Apache.Ignite.Internal.Table
             Transaction? tx,
             PooledArrayBufferWriter? request = null)
         {
-            var socket = await _table.GetSocket(tx).ConfigureAwait(false);
+            if (tx == null)
+            {
+                // Use failover socket with reconnect and retry behavior.
+                return await _table.Socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+            }
+
+            if (tx.FailoverSocket != _table.Socket)
+            {
+                throw new IgniteClientException("Specified transaction belongs to a different IgniteClient instance.");
+            }
 
-            return await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
+            // Use tx-specific socket without retry and failover.
+            return await tx.Socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
         }
 
         private async Task<PooledBuffer> DoRecordOutOpAsync(
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index f8d516f..943c2d4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -73,6 +73,11 @@ namespace Apache.Ignite.Internal.Table
         /// <inheritdoc/>
         public IRecordView<IIgniteTuple> RecordBinaryView { get; }
 
+        /// <summary>
+        /// Gets the associated socket.
+        /// </summary>
+        internal ClientFailoverSocket Socket => _socket;
+
         /// <inheritdoc/>
         public IRecordView<T> GetRecordView<T>()
             where T : class
@@ -103,26 +108,6 @@ namespace Apache.Ignite.Internal.Table
         }
 
         /// <summary>
-        /// Gets the socket.
-        /// </summary>
-        /// <param name="tx">Transaction.</param>
-        /// <returns>Socket.</returns>
-        internal ValueTask<ClientSocket> GetSocket(Transactions.Transaction? tx)
-        {
-            if (tx == null)
-            {
-                return _socket.GetSocketAsync();
-            }
-
-            if (tx.FailoverSocket != _socket)
-            {
-                throw new IgniteClientException("Specified transaction belongs to a different IgniteClient instance.");
-            }
-
-            return new ValueTask<ClientSocket>(tx.Socket);
-        }
-
-        /// <summary>
         /// Reads the schema.
         /// </summary>
         /// <param name="buf">Buffer.</param>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
index 19a08df..88416f1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryAllPolicy.cs
@@ -15,30 +15,16 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
 {
-    using MessagePack;
-
     /// <summary>
-    /// MessagePack utils.
+    /// Retry policy that always returns <c>true</c>.
     /// </summary>
-    internal static class MessagePackUtil
+    public sealed class RetryAllPolicy : RetryLimitPolicy
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Singleton instance.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
-        {
-            return value switch
-            {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
-        }
+        public static readonly RetryAllPolicy Instance = new();
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
similarity index 58%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
index 19a08df..d987228 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryLimitPolicy.cs
@@ -15,30 +15,32 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
 {
-    using MessagePack;
+    using Internal.Common;
 
     /// <summary>
-    /// MessagePack utils.
+    /// Retry policy that returns <c>true</c> when <see cref="IRetryPolicyContext.Iteration"/> is less than
+    /// the specified <see cref="RetryLimit"/>.
     /// </summary>
-    internal static class MessagePackUtil
+    public class RetryLimitPolicy : IRetryPolicy
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Gets or sets the retry limit. 0 or less for no limit.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
+        public int RetryLimit { get; set; }
+
+        /// <inheritdoc />
+        public virtual bool ShouldRetry(IRetryPolicyContext context)
         {
-            return value switch
+            IgniteArgumentCheck.NotNull(context, nameof(context));
+
+            if (RetryLimit <= 0)
             {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
+                return true;
+            }
+
+            return context.Iteration < RetryLimit;
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs b/modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
similarity index 60%
copy from modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
copy to modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
index 19a08df..d335621 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackUtil.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryNonePolicy.cs
@@ -15,30 +15,22 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Internal.Proto
+namespace Apache.Ignite
 {
-    using MessagePack;
-
     /// <summary>
-    /// MessagePack utils.
+    /// Retry policy that always returns false.
     /// </summary>
-    internal static class MessagePackUtil
+    public sealed class RetryNonePolicy : IRetryPolicy
     {
         /// <summary>
-        /// Gets the write size for the specified value.
+        /// Singleton instance.
         /// </summary>
-        /// <param name="value">Value.</param>
-        /// <returns>MessagePack write size.</returns>
-        public static int GetWriteSize(ulong value)
+        public static readonly RetryNonePolicy Instance = new();
+
+        /// <inheritdoc />
+        public bool ShouldRetry(IRetryPolicyContext context)
         {
-            return value switch
-            {
-                <= MessagePackRange.MaxFixPositiveInt => 1,
-                <= byte.MaxValue => 2,
-                <= ushort.MaxValue => 3,
-                <= uint.MaxValue => 5,
-                _ => 9
-            };
+            return false;
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
new file mode 100644
index 0000000..fcf3fa5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite
+{
+    using System;
+    using Internal.Common;
+
+    /// <summary>
+    /// Retry policy that returns true for all read-only operations that do not modify data.
+    /// </summary>
+    public sealed class RetryReadPolicy : RetryLimitPolicy
+    {
+        /// <summary>
+        /// Singleton instance.
+        /// </summary>
+        public static readonly RetryReadPolicy Instance = new();
+
+        /// <inheritdoc />
+        public override bool ShouldRetry(IRetryPolicyContext context)
+        {
+            IgniteArgumentCheck.NotNull(context, nameof(context));
+
+            if (!base.ShouldRetry(context))
+            {
+                return false;
+            }
+
+            return context.Operation switch
+            {
+                ClientOperationType.TablesGet => true,
+                ClientOperationType.TableGet => true,
+                ClientOperationType.TupleUpsert => false,
+                ClientOperationType.TupleGet => true,
+                ClientOperationType.TupleUpsertAll => false,
+                ClientOperationType.TupleGetAll => true,
+                ClientOperationType.TupleGetAndUpsert => false,
+                ClientOperationType.TupleInsert => false,
+                ClientOperationType.TupleInsertAll => false,
+                ClientOperationType.TupleReplace => false,
+                ClientOperationType.TupleReplaceExact => false,
+                ClientOperationType.TupleGetAndReplace => false,
+                ClientOperationType.TupleDelete => false,
+                ClientOperationType.TupleDeleteAll => false,
+                ClientOperationType.TupleDeleteExact => false,
+                ClientOperationType.TupleDeleteAllExact => false,
+                ClientOperationType.TupleGetAndDelete => false,
+                var unsupported => throw new NotSupportedException("Unsupported operation type: " + unsupported)
+            };
+        }
+    }
+}