You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/05/17 13:44:27 UTC

[ignite-3] branch main updated: IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 32afeaaf0 IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
32afeaaf0 is described below

commit 32afeaaf02d8eb4d0d41f8cefb49263f52976ede
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue May 17 16:44:22 2022 +0300

    IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
    
    * Implement `ExecuteColocated` in .NET client. Send requests to default node, partition awareness will be added later (IGNITE-16930).
    * To avoid extra table request on every `ExecuteColocated` call (we need table id and schemas), cache tables by name. If a table gets dropped and created again with the same name and a different id, retry the operation.
---
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  79 +++++++++++++-
 .../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs  |   9 +-
 .../RawSocketConnectionTests.cs                    |   1 +
 .../Table/RecordViewBinaryTests.cs                 |   6 ++
 .../Table/RecordViewPocoTests.cs                   |   6 ++
 .../Apache.Ignite.Tests/Table/TablesTests.cs       |   4 +-
 .../Transactions/TransactionsTests.cs              |   6 ++
 .../dotnet/Apache.Ignite/ClientErrorCode.cs        |   7 +-
 .../dotnet/Apache.Ignite/Compute/ICompute.cs       |  25 +++++
 .../dotnet/Apache.Ignite/IgniteClientException.cs  |  14 +--
 .../Apache.Ignite/Internal/Compute/Compute.cs      | 119 ++++++++++++++++++++-
 .../Apache.Ignite/Internal/IgniteClientInternal.cs |   8 +-
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |   5 +-
 .../Internal/Proto/ClientOpExtensions.cs           |   1 +
 .../Apache.Ignite/Internal/Table/RecordView.cs     |   5 +
 .../Table/Serialization/RecordSerializer.cs        |  29 ++++-
 .../dotnet/Apache.Ignite/Internal/Table/Table.cs   |  42 +++-----
 .../dotnet/Apache.Ignite/Internal/Table/Tables.cs  |  67 +++++++-----
 .../runner/app/PlatformTestNodeRunner.java         |  38 +++++++
 19 files changed, 395 insertions(+), 76 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 35b1d38f5..1c0384c4f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -23,25 +23,33 @@ namespace Apache.Ignite.Tests.Compute
     using System.Net;
     using System.Threading.Tasks;
     using Ignite.Compute;
+    using Ignite.Table;
     using Internal.Network;
     using Network;
     using NUnit.Framework;
+    using Table;
 
     /// <summary>
     /// Tests <see cref="ICompute"/>.
     /// </summary>
     public class ComputeTests : IgniteTestsBase
     {
-        private const string ConcatJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ConcatJob";
+        private const string ItThinClientComputeTest = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest";
 
-        private const string NodeNameJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$NodeNameJob";
+        private const string ConcatJob = ItThinClientComputeTest + "$ConcatJob";
 
-        private const string ErrorJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ErrorJob";
+        private const string NodeNameJob = ItThinClientComputeTest + "$NodeNameJob";
 
-        private const string EchoJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$EchoJob";
+        private const string ErrorJob = ItThinClientComputeTest + "$ErrorJob";
+
+        private const string EchoJob = ItThinClientComputeTest + "$EchoJob";
 
         private const string PlatformTestNodeRunner = "org.apache.ignite.internal.runner.app.PlatformTestNodeRunner";
 
+        private const string CreateTableJob = PlatformTestNodeRunner + "$CreateTableJob";
+
+        private const string DropTableJob = PlatformTestNodeRunner + "$DropTableJob";
+
         [Test]
         public async Task TestGetClusterNodes()
         {
@@ -177,6 +185,69 @@ namespace Apache.Ignite.Tests.Compute
             }
         }
 
+        [Test]
+        [TestCase(1, "")]
+        [TestCase(2, "_2")]
+        [TestCase(3, "")]
+        [TestCase(5, "_2")]
+        public async Task TestExecuteColocated(int key, string nodeName)
+        {
+            var keyTuple = new IgniteTuple { [KeyCol] = key };
+            var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob);
+
+            var keyPoco = new Poco { Key = key };
+            var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, NodeNameJob);
+
+            var expectedNodeName = PlatformTestNodeRunner + nodeName;
+            Assert.AreEqual(expectedNodeName, resNodeName);
+            Assert.AreEqual(expectedNodeName, resNodeName2);
+        }
+
+        [Test]
+        public void TestExecuteColocatedThrowsWhenTableDoesNotExist()
+        {
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
+                await Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), EchoJob));
+
+            Assert.AreEqual("Table 'unknownTable' does not exist.", ex!.Message);
+        }
+
+        [Test]
+        public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing()
+        {
+            var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
+                await Client.Compute.ExecuteColocatedAsync<string>(TableName, new IgniteTuple(), EchoJob));
+
+            StringAssert.Contains("Missed key column: KEY", ex!.Message);
+        }
+
+        [Test]
+        public async Task TestExecuteColocatedUpdatesTableCacheOnTableDrop()
+        {
+            // Create table and use it in ExecuteColocated.
+            var nodes = await GetNodeAsync(0);
+            var tableName = await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, "PUB.drop-me");
+
+            try
+            {
+                var keyTuple = new IgniteTuple { [KeyCol] = 1 };
+                var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+
+                // Drop table and create a new one with a different ID, then execute a computation again.
+                // This should update the cached table and complete the computation successfully.
+                await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
+                await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, tableName);
+
+                var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+
+                Assert.AreEqual(resNodeName, resNodeName2);
+            }
+            finally
+            {
+                await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
+            }
+        }
+
         private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
             (await Client.GetClusterNodesAsync()).OrderBy(n => n.Name).Skip(index).Take(1).ToList();
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index 68486aa07..2ed3bad6e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -18,7 +18,6 @@
 namespace Apache.Ignite.Tests
 {
     using System;
-    using System.Linq;
     using System.Threading.Tasks;
     using Ignite.Table;
     using Log;
@@ -30,7 +29,7 @@ namespace Apache.Ignite.Tests
     /// </summary>
     public class IgniteTestsBase
     {
-        protected const string TableName = "PUB.tbl1";
+        protected const string TableName = "PUB.TBL1";
 
         protected const string KeyCol = "key";
 
@@ -74,7 +73,7 @@ namespace Apache.Ignite.Tests
         [OneTimeTearDown]
         public void OneTimeTearDown()
         {
-            // ReSharper disable once ConstantConditionalAccessQualifier
+            // ReSharper disable once ConstantConditionalAccessQualifier, ConditionalAccessQualifierIsNonNullableAccordingToAPIContract
             Client?.Dispose();
 
             Assert.Greater(_eventListener.BuffersRented, 0);
@@ -83,10 +82,8 @@ namespace Apache.Ignite.Tests
         }
 
         [TearDown]
-        public async Task TearDown()
+        public void TearDown()
         {
-            await TupleView.DeleteAllAsync(null, Enumerable.Range(-5, 20).Select(x => GetTuple(x)));
-
             Assert.AreEqual(_eventListener.BuffersReturned, _eventListener.BuffersRented);
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index 86a68ca4e..df2702cc6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+// ReSharper disable MustUseReturnValue
 namespace Apache.Ignite.Tests
 {
     using System;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
index 19ce6b691..5a3ad6662 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
@@ -30,6 +30,12 @@ namespace Apache.Ignite.Tests.Table
     /// </summary>
     public class RecordViewBinaryTests : IgniteTestsBase
     {
+        [TearDown]
+        public async Task CleanTable()
+        {
+            await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
+        }
+
         [Test]
         public async Task TestUpsertGet()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
index 11de0746f..0f12adaeb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
@@ -29,6 +29,12 @@ namespace Apache.Ignite.Tests.Table
     /// </summary>
     public class RecordViewPocoTests : IgniteTestsBase
     {
+        [TearDown]
+        public async Task CleanTable()
+        {
+            await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
+        }
+
         [Test]
         public async Task TestUpsertGet()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
index 0edc3f73e..ab082298f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
@@ -33,7 +33,7 @@ namespace Apache.Ignite.Tests.Table
             var tables = await Client.Tables.GetTablesAsync();
 
             Assert.AreEqual(1, tables.Count);
-            Assert.AreEqual("PUB.TBL1", tables[0].Name);
+            Assert.AreEqual(TableName, tables[0].Name);
         }
 
         [Test]
@@ -42,7 +42,7 @@ namespace Apache.Ignite.Tests.Table
             var table = await Client.Tables.GetTableAsync(TableName);
 
             Assert.IsNotNull(table);
-            Assert.AreEqual("PUB.tbl1", table!.Name);
+            Assert.AreEqual(TableName, table!.Name);
         }
 
         [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 6b5038b16..7d373ced1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -28,6 +28,12 @@ namespace Apache.Ignite.Tests.Transactions
     /// </summary>
     public class TransactionsTests : IgniteTestsBase
     {
+        [TearDown]
+        public async Task CleanTable()
+        {
+            await TupleView.DeleteAllAsync(null, Enumerable.Range(1, 2).Select(x => GetTuple(x)));
+        }
+
         [Test]
         public async Task TestRecordViewBinaryOperations()
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
index 1c748b7b1..2b34b529c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
@@ -35,6 +35,11 @@ namespace Apache.Ignite
         /// <summary>
         /// Authentication or authorization failure.
         /// </summary>
-        AuthFailed = 2
+        AuthFailed = 2,
+
+        /// <summary>
+        /// Table id does not exist.
+        /// </summary>
+        TableIdDoesNotExist = 3
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 09b0edd15..8c893bd6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Compute
     using System.Collections.Generic;
     using System.Threading.Tasks;
     using Network;
+    using Table;
 
     /// <summary>
     /// Ignite Compute API provides distributed job execution functionality.
@@ -36,6 +37,30 @@ namespace Apache.Ignite.Compute
         /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
         Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args);
 
+        /// <summary>
+        /// Executes a job represented by the given class on one node where the given key is located.
+        /// </summary>
+        /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
+        /// <param name="key">Table key to be used to determine the target node for job execution.</param>
+        /// <param name="jobClassName">Java class name of the job to execute.</param>
+        /// <param name="args">Job arguments.</param>
+        /// <typeparam name="T">Job result type.</typeparam>
+        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
+        Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args);
+
+        /// <summary>
+        /// Executes a job represented by the given class on one node where the given key is located.
+        /// </summary>
+        /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
+        /// <param name="key">Table key to be used to determine the target node for job execution.</param>
+        /// <param name="jobClassName">Java class name of the job to execute.</param>
+        /// <param name="args">Job arguments.</param>
+        /// <typeparam name="T">Job result type.</typeparam>
+        /// <typeparam name="TKey">Key type.</typeparam>
+        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
+        Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args)
+            where TKey : class; // TODO: Remove class constraint (IGNITE-16355)
+
         /// <summary>
         /// Executes a compute job represented by the given class on all of the specified nodes.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
index 026262f38..29d44989d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
@@ -29,9 +29,6 @@ namespace Apache.Ignite
         /** Error code field. */
         private const string ErrorCodeField = "StatusCode";
 
-        /** Error code. */
-        private readonly ClientErrorCode _errorCode;
-
         /// <summary>
         /// Initializes a new instance of the <see cref="IgniteClientException"/> class.
         /// </summary>
@@ -70,7 +67,7 @@ namespace Apache.Ignite
         public IgniteClientException(string message, Exception? cause, ClientErrorCode statusCode)
             : base(message, cause)
         {
-            _errorCode = statusCode;
+            ErrorCode = statusCode;
         }
 
         /// <summary>
@@ -81,9 +78,14 @@ namespace Apache.Ignite
         protected IgniteClientException(SerializationInfo info, StreamingContext ctx)
             : base(info, ctx)
         {
-            _errorCode = (ClientErrorCode) info.GetInt32(ErrorCodeField);
+            ErrorCode = (ClientErrorCode)info.GetInt32(ErrorCodeField);
         }
 
+        /// <summary>
+        /// Gets the error code.
+        /// </summary>
+        public ClientErrorCode ErrorCode { get; }
+
         /// <summary>
         /// When overridden in a derived class, sets the <see cref="SerializationInfo" />
         /// with information about the exception.
@@ -96,7 +98,7 @@ namespace Apache.Ignite
         {
             base.GetObjectData(info, context);
 
-            info.AddValue(ErrorCodeField, (int) _errorCode);
+            info.AddValue(ErrorCodeField, (int) ErrorCode);
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 248818b34..0cf47f6ff 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -17,6 +17,8 @@
 
 namespace Apache.Ignite.Internal.Compute
 {
+    using System;
+    using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Linq;
     using System.Threading.Tasks;
@@ -24,7 +26,10 @@ namespace Apache.Ignite.Internal.Compute
     using Common;
     using Ignite.Compute;
     using Ignite.Network;
+    using Ignite.Table;
     using Proto;
+    using Table;
+    using Table.Serialization;
 
     /// <summary>
     /// Compute API.
@@ -34,13 +39,21 @@ namespace Apache.Ignite.Internal.Compute
         /** Socket. */
         private readonly ClientFailoverSocket _socket;
 
+        /** Tables. */
+        private readonly Tables _tables;
+
+        /** Cached tables. */
+        private readonly ConcurrentDictionary<string, Table> _tableCache = new();
+
         /// <summary>
         /// Initializes a new instance of the <see cref="Compute"/> class.
         /// </summary>
         /// <param name="socket">Socket.</param>
-        public Compute(ClientFailoverSocket socket)
+        /// <param name="tables">Tables.</param>
+        public Compute(ClientFailoverSocket socket, Tables tables)
         {
             _socket = socket;
+            _tables = tables;
         }
 
         /// <inheritdoc/>
@@ -52,6 +65,27 @@ namespace Apache.Ignite.Internal.Compute
             return await ExecuteOnOneNode<T>(GetRandomNode(nodes), jobClassName, args).ConfigureAwait(false);
         }
 
+        /// <inheritdoc/>
+        public async Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args) =>
+            await ExecuteColocatedAsync<T, IIgniteTuple>(
+                    tableName,
+                    key,
+                    serializerHandlerFunc: _ => TupleSerializerHandler.Instance,
+                    jobClassName,
+                    args)
+                .ConfigureAwait(false);
+
+        /// <inheritdoc/>
+        public async Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args)
+            where TKey : class =>
+            await ExecuteColocatedAsync<T, TKey>(
+                    tableName,
+                    key,
+                    serializerHandlerFunc: table => table.GetRecordViewInternal<TKey>().RecordSerializer.Handler,
+                    jobClassName,
+                    args)
+                .ConfigureAwait(false);
+
         /// <inheritdoc/>
         public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args)
         {
@@ -82,7 +116,7 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         private static ICollection<IClusterNode> GetNodesCollection(IEnumerable<IClusterNode> nodes) =>
-            nodes is ICollection<IClusterNode> col ? col : nodes.ToList();
+            nodes as ICollection<IClusterNode> ?? nodes.ToList();
 
         private async Task<T> ExecuteOnOneNode<T>(IClusterNode node, string jobClassName, object[] args)
         {
@@ -138,5 +172,86 @@ namespace Apache.Ignite.Internal.Compute
                 return (T)reader.ReadObjectWithType()!;
             }
         }
+
+        private async Task<Table> GetTableAsync(string tableName)
+        {
+            if (_tableCache.TryGetValue(tableName, out var cachedTable))
+            {
+                return cachedTable;
+            }
+
+            var table = await _tables.GetTableInternalAsync(tableName).ConfigureAwait(false);
+
+            if (table != null)
+            {
+                _tableCache[tableName] = table;
+                return table;
+            }
+
+            _tableCache.TryRemove(tableName, out _);
+
+            throw new IgniteClientException($"Table '{tableName}' does not exist.");
+        }
+
+        private async Task<T> ExecuteColocatedAsync<T, TKey>(
+            string tableName,
+            TKey key,
+            Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
+            string jobClassName,
+            params object[] args)
+            where TKey : class
+        {
+            // TODO: IGNITE-16990 - implement partition awareness.
+            IgniteArgumentCheck.NotNull(tableName, nameof(tableName));
+            IgniteArgumentCheck.NotNull(key, nameof(key));
+            IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
+
+            while (true)
+            {
+                var table = await GetTableAsync(tableName).ConfigureAwait(false);
+                var schema = await table.GetLatestSchemaAsync().ConfigureAwait(false);
+
+                using var bufferWriter = Write(table, schema);
+
+                try
+                {
+                    using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, bufferWriter).ConfigureAwait(false);
+
+                    return Read(res);
+                }
+                catch (IgniteClientException e) when (e.ErrorCode == ClientErrorCode.TableIdDoesNotExist)
+                {
+                    // Table was dropped - remove from cache.
+                    // Try again in case a new table with the same name exists.
+                    _tableCache.TryRemove(tableName, out _);
+                }
+            }
+
+            PooledArrayBufferWriter Write(Table table, Schema schema)
+            {
+                var bufferWriter = new PooledArrayBufferWriter();
+                var w = bufferWriter.GetMessageWriter();
+
+                w.Write(table.Id);
+                w.Write(schema.Version);
+
+                var serializerHandler = serializerHandlerFunc(table);
+                serializerHandler.Write(ref w, schema, key, true);
+
+                w.Write(jobClassName);
+                w.WriteObjectArrayWithTypes(args);
+
+                w.Flush();
+
+                return bufferWriter;
+            }
+
+            static T Read(in PooledBuffer buf)
+            {
+                var reader = buf.GetReader();
+
+                return (T)reader.ReadObjectWithType()!;
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 20dededa1..401ebe8d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -44,9 +44,13 @@ namespace Apache.Ignite.Internal
         public IgniteClientInternal(ClientFailoverSocket socket)
         {
             _socket = socket;
-            Tables = new Tables(socket);
+
+            var tables = new Tables(socket);
+            Tables = tables;
+
             Transactions = new Transactions.Transactions(socket);
-            Compute = new Compute.Compute(socket);
+
+            Compute = new Compute.Compute(socket, tables);
         }
 
         /// <inheritdoc/>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index b00eaa17b..72e89b0e9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -92,6 +92,9 @@ namespace Apache.Ignite.Internal.Proto
         ComputeExecute = 47,
 
         /** Get cluster nodes. */
-        ClusterGetNodes = 48
+        ClusterGetNodes = 48,
+
+        /** Execute compute job. */
+        ComputeExecuteColocated = 49
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 714a3fa06..c1d65d717 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -52,6 +52,7 @@ namespace Apache.Ignite.Internal.Proto
                 ClientOp.TupleDeleteAllExact => ClientOperationType.TupleDeleteAllExact,
                 ClientOp.TupleGetAndDelete => ClientOperationType.TupleGetAndDelete,
                 ClientOp.ComputeExecute => ClientOperationType.ComputeExecute,
+                ClientOp.ComputeExecuteColocated => ClientOperationType.ComputeExecute,
                 ClientOp.TxBegin => null,
                 ClientOp.TxCommit => null,
                 ClientOp.TxRollback => null,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 5f1edea60..71e568083 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -52,6 +52,11 @@ namespace Apache.Ignite.Internal.Table
             _ser = ser;
         }
 
+        /// <summary>
+        /// Gets the record serializer.
+        /// </summary>
+        public RecordSerializer<T> RecordSerializer => _ser;
+
         /// <inheritdoc/>
         public async Task<T?> GetAsync(ITransaction? transaction, T key)
         {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index f30a57d5b..ee0970ee2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
     using System.Collections.Generic;
     using Buffers;
     using MessagePack;
+    using Proto;
 
     /// <summary>
     /// Generic record serializer.
@@ -47,6 +48,11 @@ namespace Apache.Ignite.Internal.Table.Serialization
             _handler = handler;
         }
 
+        /// <summary>
+        /// Gets the handler.
+        /// </summary>
+        public IRecordSerializerHandler<T> Handler => _handler;
+
         /// <summary>
         /// Reads the value part.
         /// </summary>
@@ -195,7 +201,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
         {
             var w = buf.GetMessageWriter();
 
-            _table.WriteIdAndTx(ref w, tx);
+            WriteIdAndTx(ref w, tx);
             w.Write(schema.Version);
             w.Flush();
 
@@ -236,10 +242,29 @@ namespace Apache.Ignite.Internal.Table.Serialization
             T rec,
             bool keyOnly = false)
         {
-            _table.WriteIdAndTx(ref w, tx);
+            WriteIdAndTx(ref w, tx);
             w.Write(schema.Version);
 
             _handler.Write(ref w, schema, rec, keyOnly);
         }
+
+        /// <summary>
+        /// Writes table id and transaction id, if present.
+        /// </summary>
+        /// <param name="w">Writer.</param>
+        /// <param name="tx">Transaction.</param>
+        private void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx)
+        {
+            w.Write(_table.Id);
+
+            if (tx == null)
+            {
+                w.WriteNil();
+            }
+            else
+            {
+                w.Write(tx.Id);
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 943c2d426..ff8392350 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -35,9 +35,6 @@ namespace Apache.Ignite.Internal.Table
         /** Socket. */
         private readonly ClientFailoverSocket _socket;
 
-        /** Table id. */
-        private readonly Guid _id;
-
         /** Schemas. */
         private readonly ConcurrentDictionary<int, Schema> _schemas = new();
 
@@ -60,7 +57,7 @@ namespace Apache.Ignite.Internal.Table
         {
             _socket = socket;
             Name = name;
-            _id = id;
+            Id = id;
 
             RecordBinaryView = new RecordView<IIgniteTuple>(
                 this,
@@ -78,35 +75,30 @@ namespace Apache.Ignite.Internal.Table
         /// </summary>
         internal ClientFailoverSocket Socket => _socket;
 
+        /// <summary>
+        /// Gets the table id.
+        /// </summary>
+        internal Guid Id { get; }
+
         /// <inheritdoc/>
         public IRecordView<T> GetRecordView<T>()
+            where T : class =>
+            GetRecordViewInternal<T>();
+
+        /// <summary>
+        /// Gets the record view for the specified type.
+        /// </summary>
+        /// <typeparam name="T">Record type.</typeparam>
+        /// <returns>Record view.</returns>
+        internal RecordView<T> GetRecordViewInternal<T>()
             where T : class
         {
             // ReSharper disable once HeapView.CanAvoidClosure (generics prevent this)
-            return (IRecordView<T>)_recordViews.GetOrAdd(
+            return (RecordView<T>)_recordViews.GetOrAdd(
                 typeof(T),
                 _ => new RecordView<T>(this, new RecordSerializer<T>(this, new ObjectSerializerHandler<T>())));
         }
 
-        /// <summary>
-        /// Writes the transaction id, if present.
-        /// </summary>
-        /// <param name="w">Writer.</param>
-        /// <param name="tx">Transaction.</param>
-        internal void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx)
-        {
-            w.Write(_id);
-
-            if (tx == null)
-            {
-                w.WriteNil();
-            }
-            else
-            {
-                w.Write(tx.Id);
-            }
-        }
-
         /// <summary>
         /// Reads the schema.
         /// </summary>
@@ -168,7 +160,7 @@ namespace Apache.Ignite.Internal.Table
             void Write()
             {
                 var w = writer.GetMessageWriter();
-                w.Write(_id);
+                w.Write(Id);
 
                 if (version == null)
                 {
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
index b8392099b..ef8ddd8f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
@@ -35,8 +35,8 @@ namespace Apache.Ignite.Internal.Table
         /** Socket. */
         private readonly ClientFailoverSocket _socket;
 
-        /** Cached tables. */
-        private readonly ConcurrentDictionary<Guid, ITable> _tables = new();
+        /** Cached tables. Caching here is required to retain schema and serializer caches in <see cref="Table"/>. */
+        private readonly ConcurrentDictionary<Guid, Table> _tables = new();
 
         /// <summary>
         /// Initializes a new instance of the <see cref="Tables"/> class.
@@ -50,28 +50,7 @@ namespace Apache.Ignite.Internal.Table
         /// <inheritdoc/>
         public async Task<ITable?> GetTableAsync(string name)
         {
-            IgniteArgumentCheck.NotNull(name, nameof(name));
-
-            using var writer = new PooledArrayBufferWriter();
-            Write(writer.GetMessageWriter());
-
-            using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false);
-            return Read(resBuf.GetReader());
-
-            void Write(MessagePackWriter w)
-            {
-                w.Write(name);
-                w.Flush();
-            }
-
-            // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+)
-            ITable? Read(MessagePackReader r) =>
-                r.NextMessagePackType == MessagePackType.Nil
-                    ? null
-                    : _tables.GetOrAdd(
-                        r.ReadGuid(),
-                        (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket),
-                        (name, _socket));
+            return await GetTableInternalAsync(name).ConfigureAwait(false);
         }
 
         /// <inheritdoc/>
@@ -90,11 +69,49 @@ namespace Apache.Ignite.Internal.Table
                 {
                     var id = r.ReadGuid();
                     var name = r.ReadString();
-                    res.Add(new Table(name, id, _socket));
+
+                    // ReSharper disable once LambdaExpressionMustBeStatic (not supported by .NET Core 3.1, TODO IGNITE-16994)
+                    var table = _tables.GetOrAdd(
+                        id,
+                        (Guid id0, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id0, arg.Socket),
+                        (name, _socket));
+
+                    res.Add(table);
                 }
 
                 return res;
             }
         }
+
+        /// <summary>
+        /// Gets the table by name.
+        /// </summary>
+        /// <param name="name">Name.</param>
+        /// <returns>Table.</returns>
+        internal async Task<Table?> GetTableInternalAsync(string name)
+        {
+            IgniteArgumentCheck.NotNull(name, nameof(name));
+
+            using var writer = new PooledArrayBufferWriter();
+            Write(writer.GetMessageWriter());
+
+            using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false);
+            return Read(resBuf.GetReader());
+
+            void Write(MessagePackWriter w)
+            {
+                w.Write(name);
+                w.Flush();
+            }
+
+            // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+)
+            Table? Read(MessagePackReader r) =>
+                r.NextMessagePackType == MessagePackType.Nil
+                    ? null
+                    : _tables.GetOrAdd(
+                        r.ReadGuid(),
+                        (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket),
+                        (name, _socket));
+        }
     }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index a0acb8e5c..4e14c9e8e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.schema.SchemaBuilders;
 import org.apache.ignite.schema.definition.ColumnType;
 import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
 
 /**
  * Helper class for non-Java platform tests (.NET, C++, Python, ...). Starts nodes, populates tables and data for tests.
@@ -150,4 +153,39 @@ public class PlatformTestNodeRunner {
     private static int getPort(IgniteImpl node) {
         return node.clientAddress().port();
     }
+
+    /**
+     * Compute job that creates a table.
+     */
+    @SuppressWarnings({"unused"}) // Used by platform tests.
+    private static class CreateTableJob implements ComputeJob<String> {
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            String tableName = (String) args[0];
+
+            Table table = context.ignite().tables().createTable(
+                    tableName,
+                    tblChanger -> tblChanger
+                            .changeColumns(cols ->
+                                    cols.create("key", col -> col.changeType(t -> t.changeType("INT64")).changeNullable(false)))
+                            .changePrimaryKey(pk -> pk.changeColumns("key").changeColocationColumns("key"))
+            );
+
+            return table.name();
+        }
+    }
+
+    /**
+     * Compute job that drops a table.
+     */
+    @SuppressWarnings({"unused"}) // Used by platform tests.
+    private static class DropTableJob implements ComputeJob<String> {
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            String tableName = (String) args[0];
+            context.ignite().tables().dropTable(tableName);
+
+            return tableName;
+        }
+    }
 }