You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2023/05/15 10:58:40 UTC

[ignite-3] branch main updated: IGNITE-19355 .NET: Do not request same schema more than once (#2071)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d33cf4fa38 IGNITE-19355 .NET: Do not request same schema more than once (#2071)
d33cf4fa38 is described below

commit d33cf4fa38cb2ac657742016a5a69125f5a47259
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon May 15 13:58:34 2023 +0300

    IGNITE-19355 .NET: Do not request same schema more than once (#2071)
    
    Cache schema tasks instead of schemas (task results). If a given schema is already requested, await existing task instead of sending another request.
---
 .../Table/TableGetBenchmarks.cs                    |  2 +-
 .../Table/TableGetMultiThreadedBenchmarks.cs       |  2 +-
 .../Compute/ComputeClusterAwarenessTests.cs        |  4 +-
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       | 24 ++++--
 .../dotnet/Apache.Ignite.Tests/FakeServerTests.cs  |  2 +-
 .../dotnet/Apache.Ignite.Tests/MetricsTests.cs     |  2 +-
 .../dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs | 28 +++----
 .../Apache.Ignite.Tests/Table/SchemaUpdateTest.cs  | 87 ++++++++++++++++++++++
 .../dotnet/Apache.Ignite.Tests/TestUtils.cs        |  8 ++
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   | 59 +++++++++------
 10 files changed, 166 insertions(+), 52 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetBenchmarks.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetBenchmarks.cs
index 3567fee50a..63802e5a9a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetBenchmarks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetBenchmarks.cs
@@ -38,7 +38,7 @@ public class TableGetBenchmarks
     [GlobalSetup]
     public async Task GlobalSetup()
     {
-        _server = new FakeServer();
+        _server = new FakeServer(true);
         _client = await IgniteClient.StartAsync(new IgniteClientConfiguration(_server.Endpoint));
     }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetMultiThreadedBenchmarks.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetMultiThreadedBenchmarks.cs
index aecd2675f4..8b1f2f3690 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetMultiThreadedBenchmarks.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/TableGetMultiThreadedBenchmarks.cs
@@ -51,7 +51,7 @@ public class TableGetMultiThreadedBenchmarks
     {
         // Use a delay on server to imitate some work.
         _servers = Enumerable.Range(0, ServerCount)
-            .Select(_ => new FakeServer { OperationDelay = TimeSpan.FromMilliseconds(5) })
+            .Select(_ => new FakeServer(true) { OperationDelay = TimeSpan.FromMilliseconds(5) })
             .ToList();
 
         _client = await IgniteClient.StartAsync(new IgniteClientConfiguration(_servers.Select(s => s.Endpoint).ToArray()));
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index eac861de1b..43c796c822 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -82,8 +82,8 @@ namespace Apache.Ignite.Tests.Compute
         [Test]
         public async Task TestClientRetriesComputeJobOnPrimaryAndDefaultNodes()
         {
-            using var server1 = new FakeServer(shouldDropConnection: cnt => cnt % 2 == 0, nodeName: "s1");
-            using var server2 = new FakeServer(shouldDropConnection: cnt => cnt % 2 == 0, nodeName: "s2");
+            using var server1 = new FakeServer(shouldDropConnection: ctx => ctx.RequestCount % 2 == 0, nodeName: "s1");
+            using var server2 = new FakeServer(shouldDropConnection: ctx => ctx.RequestCount % 2 == 0, nodeName: "s2");
 
             var clientCfg = new IgniteClientConfiguration
             {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index e7c78ee458..cca87ea907 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -58,7 +58,7 @@ namespace Apache.Ignite.Tests
 
         private readonly CancellationTokenSource _cts = new();
 
-        private readonly Func<int, bool> _shouldDropConnection;
+        private readonly Func<RequestContext, bool> _shouldDropConnection;
 
         private readonly ConcurrentQueue<ClientOp>? _ops;
 
@@ -70,8 +70,14 @@ namespace Apache.Ignite.Tests
 
         private volatile bool _dropNewConnections;
 
-        public FakeServer(
-            Func<int, bool>? shouldDropConnection = null,
+        public FakeServer(bool disableOpsTracking)
+            : this(null, disableOpsTracking: disableOpsTracking)
+        {
+            // No-op.
+        }
+
+        internal FakeServer(
+            Func<RequestContext, bool>? shouldDropConnection = null,
             string nodeName = "fake-server",
             bool disableOpsTracking = false)
         {
@@ -491,15 +497,15 @@ namespace Apache.Ignite.Tests
                         Thread.Sleep(OperationDelay);
                     }
 
-                    if (_shouldDropConnection(++requestCount))
-                    {
-                        break;
-                    }
-
                     var reader = new MsgPackReader(msg.AsMemory().Span);
                     var opCode = (ClientOp)reader.ReadInt32();
                     var requestId = reader.ReadInt64();
 
+                    if (_shouldDropConnection(new RequestContext(++requestCount, opCode, requestId)))
+                    {
+                        break;
+                    }
+
                     _ops?.Enqueue(opCode);
 
                     switch (opCode)
@@ -632,6 +638,8 @@ namespace Apache.Ignite.Tests
             }
         }
 
+        internal record struct RequestContext(int RequestCount, ClientOp OpCode, long RequestId);
+
         [SuppressMessage("Design", "CA1032:Implement standard exception constructors", Justification = "Tests.")]
         [SuppressMessage("Design", "CA1064:Exceptions should be public", Justification = "Tests.")]
         private class ConnectionLostException : Exception
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
index 2e0bb0f4a6..a42a98133d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServerTests.cs
@@ -68,7 +68,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = RetryNonePolicy.Instance
             };
 
-            using var server = new FakeServer(reqId => reqId % 3 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 3 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             // 2 requests succeed, 3rd fails.
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index be07cd7888..2f71b2d421 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -189,7 +189,7 @@ public class MetricsTests
     [Test]
     public async Task TestRequestsRetried()
     {
-        using var server = new FakeServer(shouldDropConnection: idx => idx is > 1 and < 5);
+        using var server = new FakeServer(shouldDropConnection: ctx => ctx.RequestCount is > 1 and < 5);
         using var client = await server.ConnectClientAsync();
 
         await client.Tables.GetTablesAsync();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
index ad1242b066..d76c907eaa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
@@ -38,7 +38,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new RetryLimitPolicy { RetryLimit = 1 }
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             for (int i = 0; i < IterCount; i++)
@@ -52,7 +52,7 @@ namespace Apache.Ignite.Tests
         {
             var cfg = new IgniteClientConfiguration { RetryPolicy = new RetryLimitPolicy() };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             var ex = Assert.ThrowsAsync<IgniteException>(async () => await client.Tables.GetTableAsync("bad-table"));
@@ -65,7 +65,7 @@ namespace Apache.Ignite.Tests
             var testRetryPolicy = new TestRetryPolicy();
             var cfg = new IgniteClientConfiguration { RetryPolicy = testRetryPolicy };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             var tx = await client.Transactions.BeginAsync();
@@ -82,7 +82,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new TestRetryPolicy { RetryLimit = 5 }
             };
 
-            using var server = new FakeServer(reqId => reqId > 1);
+            using var server = new FakeServer(ctx => ctx.RequestCount > 1);
             using var client = await server.ConnectClientAsync(cfg);
 
             await client.Tables.GetTablesAsync();
@@ -94,7 +94,7 @@ namespace Apache.Ignite.Tests
         [Test]
         public async Task TestFailoverWithRetryPolicyThrowsOnDefaultRetryLimitExceeded()
         {
-            using var server = new FakeServer(reqId => reqId > 1);
+            using var server = new FakeServer(ctx => ctx.RequestCount > 1);
             using var client = await server.ConnectClientAsync();
 
             await client.Tables.GetTablesAsync();
@@ -114,7 +114,7 @@ namespace Apache.Ignite.Tests
                 }
             };
 
-            using var server = new FakeServer(reqId => reqId % 30 != 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 30 != 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             for (var i = 0; i < IterCount; i++)
@@ -126,7 +126,7 @@ namespace Apache.Ignite.Tests
         [Test]
         public async Task TestRetryPolicyIsDisabledByDefault()
         {
-            using var server = new FakeServer(reqId => reqId > 1);
+            using var server = new FakeServer(ctx => ctx.RequestCount > 1);
             using var client = await server.ConnectClientAsync();
 
             await client.Tables.GetTablesAsync();
@@ -142,7 +142,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = null!
             };
 
-            using var server = new FakeServer(reqId => reqId > 1);
+            using var server = new FakeServer(ctx => ctx.RequestCount > 1);
             using var client = await server.ConnectClientAsync(cfg);
 
             await client.Tables.GetTablesAsync();
@@ -160,7 +160,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = testRetryPolicy
             };
 
-            using var server = new FakeServer(reqId => reqId % 3 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 3 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             for (var i = 0; i < IterCount; i++)
@@ -188,7 +188,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = testRetryPolicy
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             await client.Tables.GetTablesAsync();
@@ -205,7 +205,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             for (int i = 0; i < IterCount; i++)
@@ -224,7 +224,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new TestRetryPolicy()
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
             var tx = await client.Transactions.BeginAsync();
 
@@ -242,7 +242,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new TestRetryPolicy { RetryLimit = 1 }
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             for (int i = 0; i < IterCount; i++)
@@ -260,7 +260,7 @@ namespace Apache.Ignite.Tests
                 RetryPolicy = new RetryReadPolicy()
             };
 
-            using var server = new FakeServer(reqId => reqId % 2 == 0);
+            using var server = new FakeServer(ctx => ctx.RequestCount % 2 == 0);
             using var client = await server.ConnectClientAsync(cfg);
 
             var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaUpdateTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaUpdateTest.cs
new file mode 100644
index 0000000000..2c4eec63b5
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaUpdateTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Table;
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Ignite.Table;
+using Internal.Proto;
+using Internal.Table;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests schema update logic.
+/// </summary>
+public class SchemaUpdateTest
+{
+    [Test]
+    public async Task TestMultipleParallelOperationsRequestSchemaOnce()
+    {
+        using var server = new FakeServer
+        {
+            OperationDelay = TimeSpan.FromMilliseconds(100)
+        };
+
+        using var client = await server.ConnectClientAsync();
+        var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        var view = table!.RecordBinaryView;
+
+        // Schema is not known initially, so both operations will request it.
+        // However, we cache the request task, so only one request will be sent.
+        var task1 = view.UpsertAsync(null, new IgniteTuple { ["id"] = 1 });
+        var task2 = view.UpsertAsync(null, new IgniteTuple { ["id"] = 2 });
+
+        await Task.WhenAll(task1, task2);
+        Assert.AreEqual(1, server.ClientOps.Count(x => x == ClientOp.SchemasGet), string.Join(", ", server.ClientOps));
+
+        var schemas = table.GetFieldValue<IDictionary<int, Task<Schema>>>("_schemas");
+
+        // Same schema is cached as "Unknown latest" (-1) and specific version (1).
+        CollectionAssert.AreEquivalent(new[] { -1, 1 }, schemas.Keys);
+        Assert.AreEqual(1, schemas[-1].GetAwaiter().GetResult().Version);
+        Assert.AreEqual(1, schemas[1].GetAwaiter().GetResult().Version);
+    }
+
+    [Test]
+    public async Task TestFailedSchemaLoadTaskIsRetried()
+    {
+        using var server = new FakeServer(shouldDropConnection: ctx => ctx is { OpCode: ClientOp.SchemasGet, RequestCount: < 3 });
+
+        var cfg = new IgniteClientConfiguration
+        {
+            RetryPolicy = new RetryNonePolicy()
+        };
+
+        using var client = await server.ConnectClientAsync(cfg);
+
+        var table = await client.Tables.GetTableAsync(FakeServer.ExistingTableName);
+        var view = table!.RecordBinaryView;
+        var schemas = table.GetFieldValue<IDictionary<int, Task<Schema>>>("_schemas");
+
+        // First operation fails because server drops connection.
+        Assert.ThrowsAsync<IgniteClientConnectionException>(async () => await view.UpsertAsync(null, new IgniteTuple { ["id"] = 1 }));
+        Assert.IsTrue(schemas[-1].IsFaulted);
+
+        // Second operation should ignore failed task and create a new one, which will succeed.
+        await view.UpsertAsync(null, new IgniteTuple { ["id"] = 1 });
+        Assert.IsTrue(schemas[-1].IsCompletedSuccessfully);
+        Assert.IsTrue(schemas[1].IsCompletedSuccessfully);
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index 5feb90762d..0c87abcad0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -62,6 +62,14 @@ namespace Apache.Ignite.Tests
             Assert.Fail(message);
         }
 
+        public static T GetFieldValue<T>(this object obj, string fieldName)
+        {
+            var field = obj.GetType().GetField(fieldName, BindingFlags.Instance | BindingFlags.NonPublic);
+            Assert.IsNotNull(field, $"Field '{fieldName}' not found in '{obj.GetType()}'");
+
+            return (T) field!.GetValue(obj)!;
+        }
+
         private static string GetSolutionDir()
         {
             var dir = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index a7d310657c..6a310e7eb3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal.Table
 {
     using System;
     using System.Collections.Concurrent;
+    using System.Collections.Generic;
     using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
@@ -38,6 +39,9 @@ namespace Apache.Ignite.Internal.Table
     /// </summary>
     internal sealed class Table : ITable
     {
+        /** Unknown schema version. */
+        private const int UnknownSchemaVersion = -1;
+
         /** Socket. */
         private readonly ClientFailoverSocket _socket;
 
@@ -45,7 +49,7 @@ namespace Apache.Ignite.Internal.Table
         private readonly Sql _sql;
 
         /** Schemas. */
-        private readonly ConcurrentDictionary<int, Schema> _schemas = new();
+        private readonly ConcurrentDictionary<int, Task<Schema>> _schemas = new();
 
         /** Cached record views. */
         private readonly ConcurrentDictionary<Type, object> _recordViews = new();
@@ -60,7 +64,7 @@ namespace Apache.Ignite.Internal.Table
         private readonly SemaphoreSlim _partitionAssignmentSemaphore = new(1);
 
         /** */
-        private volatile int _latestSchemaVersion = -1;
+        private volatile int _latestSchemaVersion = UnknownSchemaVersion;
 
         /** */
         private volatile int _partitionAssignmentVersion = -1;
@@ -155,32 +159,23 @@ namespace Apache.Ignite.Internal.Table
         /// </summary>
         /// <param name="buf">Buffer.</param>
         /// <returns>Schema or null.</returns>
-        internal async ValueTask<Schema> ReadSchemaAsync(PooledBuffer buf)
+        internal Task<Schema> ReadSchemaAsync(PooledBuffer buf)
         {
-            var ver = buf.GetReader().ReadInt32();
-
-            if (_schemas.TryGetValue(ver, out var res))
-            {
-                return res;
-            }
+            var version = buf.GetReader().ReadInt32();
 
-            return await LoadSchemaAsync(ver).ConfigureAwait(false);
+            return GetCachedSchemaAsync(version);
         }
 
         /// <summary>
         /// Gets the latest schema.
         /// </summary>
         /// <returns>Schema.</returns>
-        internal async ValueTask<Schema> GetLatestSchemaAsync()
+        internal Task<Schema> GetLatestSchemaAsync()
         {
-            var latestSchemaVersion = _latestSchemaVersion;
-
-            if (latestSchemaVersion >= 0)
-            {
-                return _schemas[latestSchemaVersion];
-            }
-
-            return await LoadSchemaAsync(null).ConfigureAwait(false);
+            // _latestSchemaVersion can be -1 (unknown) or a valid version.
+            // In case of unknown version, we request latest from the server and cache it with -1 key
+            // to avoid duplicate requests for latest schema.
+            return GetCachedSchemaAsync(_latestSchemaVersion);
         }
 
         /// <summary>
@@ -210,6 +205,23 @@ namespace Apache.Ignite.Internal.Table
             return PreferredNode.FromId(nodeId);
         }
 
+        private Task<Schema> GetCachedSchemaAsync(int version)
+        {
+            var task = GetOrAdd();
+
+            if (!task.IsFaulted)
+            {
+                return task;
+            }
+
+            // Do not return failed task. Remove it from the cache and try again.
+            _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version, task));
+
+            return GetOrAdd();
+
+            Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver, tbl) => tbl.LoadSchemaAsync(ver), this);
+        }
+
         private async ValueTask<string[]?> GetPartitionAssignmentAsync()
         {
             var socketVer = _socket.PartitionAssignmentVersion;
@@ -251,7 +263,7 @@ namespace Apache.Ignite.Internal.Table
         /// </summary>
         /// <param name="version">Version.</param>
         /// <returns>Schema.</returns>
-        private async Task<Schema> LoadSchemaAsync(int? version)
+        private async Task<Schema> LoadSchemaAsync(int version)
         {
             using var writer = ProtoCommon.GetMessageWriter();
             Write();
@@ -264,14 +276,14 @@ namespace Apache.Ignite.Internal.Table
                 var w = writer.MessageWriter;
                 w.Write(Id);
 
-                if (version == null)
+                if (version == UnknownSchemaVersion)
                 {
                     w.WriteNil();
                 }
                 else
                 {
                     w.WriteArrayHeader(1);
-                    w.Write(version.Value);
+                    w.Write(version);
                 }
             }
 
@@ -338,8 +350,7 @@ namespace Apache.Ignite.Internal.Table
             }
 
             var schema = new Schema(schemaVersion, keyColumnCount, columns);
-
-            _schemas[schemaVersion] = schema;
+            _schemas[schemaVersion] = Task.FromResult(schema);
 
             if (_logger?.IsEnabled(LogLevel.Debug) == true)
             {