You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/05/17 13:44:27 UTC
[ignite-3] branch main updated: IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 32afeaaf0 IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
32afeaaf0 is described below
commit 32afeaaf02d8eb4d0d41f8cefb49263f52976ede
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue May 17 16:44:22 2022 +0300
IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
* Implement `ExecuteColocated` in .NET client. Send requests to default node, partition awareness will be added later (IGNITE-16930).
* To avoid extra table request on every `ExecuteColocated` call (we need table id and schemas), cache tables by name. If a table gets dropped and created again with the same name and a different id, retry the operation.
---
.../Apache.Ignite.Tests/Compute/ComputeTests.cs | 79 +++++++++++++-
.../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 9 +-
.../RawSocketConnectionTests.cs | 1 +
.../Table/RecordViewBinaryTests.cs | 6 ++
.../Table/RecordViewPocoTests.cs | 6 ++
.../Apache.Ignite.Tests/Table/TablesTests.cs | 4 +-
.../Transactions/TransactionsTests.cs | 6 ++
.../dotnet/Apache.Ignite/ClientErrorCode.cs | 7 +-
.../dotnet/Apache.Ignite/Compute/ICompute.cs | 25 +++++
.../dotnet/Apache.Ignite/IgniteClientException.cs | 14 +--
.../Apache.Ignite/Internal/Compute/Compute.cs | 119 ++++++++++++++++++++-
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 8 +-
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 5 +-
.../Internal/Proto/ClientOpExtensions.cs | 1 +
.../Apache.Ignite/Internal/Table/RecordView.cs | 5 +
.../Table/Serialization/RecordSerializer.cs | 29 ++++-
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 42 +++-----
.../dotnet/Apache.Ignite/Internal/Table/Tables.cs | 67 +++++++-----
.../runner/app/PlatformTestNodeRunner.java | 38 +++++++
19 files changed, 395 insertions(+), 76 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 35b1d38f5..1c0384c4f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -23,25 +23,33 @@ namespace Apache.Ignite.Tests.Compute
using System.Net;
using System.Threading.Tasks;
using Ignite.Compute;
+ using Ignite.Table;
using Internal.Network;
using Network;
using NUnit.Framework;
+ using Table;
/// <summary>
/// Tests <see cref="ICompute"/>.
/// </summary>
public class ComputeTests : IgniteTestsBase
{
- private const string ConcatJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ConcatJob";
+ private const string ItThinClientComputeTest = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest";
- private const string NodeNameJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$NodeNameJob";
+ private const string ConcatJob = ItThinClientComputeTest + "$ConcatJob";
- private const string ErrorJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ErrorJob";
+ private const string NodeNameJob = ItThinClientComputeTest + "$NodeNameJob";
- private const string EchoJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$EchoJob";
+ private const string ErrorJob = ItThinClientComputeTest + "$ErrorJob";
+
+ private const string EchoJob = ItThinClientComputeTest + "$EchoJob";
private const string PlatformTestNodeRunner = "org.apache.ignite.internal.runner.app.PlatformTestNodeRunner";
+ private const string CreateTableJob = PlatformTestNodeRunner + "$CreateTableJob";
+
+ private const string DropTableJob = PlatformTestNodeRunner + "$DropTableJob";
+
[Test]
public async Task TestGetClusterNodes()
{
@@ -177,6 +185,69 @@ namespace Apache.Ignite.Tests.Compute
}
}
+ [Test]
+ [TestCase(1, "")]
+ [TestCase(2, "_2")]
+ [TestCase(3, "")]
+ [TestCase(5, "_2")]
+ public async Task TestExecuteColocated(int key, string nodeName)
+ {
+ var keyTuple = new IgniteTuple { [KeyCol] = key };
+ var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob);
+
+ var keyPoco = new Poco { Key = key };
+ var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, NodeNameJob);
+
+ var expectedNodeName = PlatformTestNodeRunner + nodeName;
+ Assert.AreEqual(expectedNodeName, resNodeName);
+ Assert.AreEqual(expectedNodeName, resNodeName2);
+ }
+
+ [Test]
+ public void TestExecuteColocatedThrowsWhenTableDoesNotExist()
+ {
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
+ await Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), EchoJob));
+
+ Assert.AreEqual("Table 'unknownTable' does not exist.", ex!.Message);
+ }
+
+ [Test]
+ public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing()
+ {
+ var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
+ await Client.Compute.ExecuteColocatedAsync<string>(TableName, new IgniteTuple(), EchoJob));
+
+ StringAssert.Contains("Missed key column: KEY", ex!.Message);
+ }
+
+ [Test]
+ public async Task TestExecuteColocatedUpdatesTableCacheOnTableDrop()
+ {
+ // Create table and use it in ExecuteColocated.
+ var nodes = await GetNodeAsync(0);
+ var tableName = await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, "PUB.drop-me");
+
+ try
+ {
+ var keyTuple = new IgniteTuple { [KeyCol] = 1 };
+ var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+
+ // Drop table and create a new one with a different ID, then execute a computation again.
+ // This should update the cached table and complete the computation successfully.
+ await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
+ await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, tableName);
+
+ var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);
+
+ Assert.AreEqual(resNodeName, resNodeName2);
+ }
+ finally
+ {
+ await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
+ }
+ }
+
private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
(await Client.GetClusterNodesAsync()).OrderBy(n => n.Name).Skip(index).Take(1).ToList();
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index 68486aa07..2ed3bad6e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Tests
{
using System;
- using System.Linq;
using System.Threading.Tasks;
using Ignite.Table;
using Log;
@@ -30,7 +29,7 @@ namespace Apache.Ignite.Tests
/// </summary>
public class IgniteTestsBase
{
- protected const string TableName = "PUB.tbl1";
+ protected const string TableName = "PUB.TBL1";
protected const string KeyCol = "key";
@@ -74,7 +73,7 @@ namespace Apache.Ignite.Tests
[OneTimeTearDown]
public void OneTimeTearDown()
{
- // ReSharper disable once ConstantConditionalAccessQualifier
+ // ReSharper disable once ConstantConditionalAccessQualifier, ConditionalAccessQualifierIsNonNullableAccordingToAPIContract
Client?.Dispose();
Assert.Greater(_eventListener.BuffersRented, 0);
@@ -83,10 +82,8 @@ namespace Apache.Ignite.Tests
}
[TearDown]
- public async Task TearDown()
+ public void TearDown()
{
- await TupleView.DeleteAllAsync(null, Enumerable.Range(-5, 20).Select(x => GetTuple(x)));
-
Assert.AreEqual(_eventListener.BuffersReturned, _eventListener.BuffersRented);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index 86a68ca4e..df2702cc6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+// ReSharper disable MustUseReturnValue
namespace Apache.Ignite.Tests
{
using System;
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
index 19ce6b691..5a3ad6662 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewBinaryTests.cs
@@ -30,6 +30,12 @@ namespace Apache.Ignite.Tests.Table
/// </summary>
public class RecordViewBinaryTests : IgniteTestsBase
{
+ [TearDown]
+ public async Task CleanTable()
+ {
+ await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
+ }
+
[Test]
public async Task TestUpsertGet()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
index 11de0746f..0f12adaeb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/RecordViewPocoTests.cs
@@ -29,6 +29,12 @@ namespace Apache.Ignite.Tests.Table
/// </summary>
public class RecordViewPocoTests : IgniteTestsBase
{
+ [TearDown]
+ public async Task CleanTable()
+ {
+ await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
+ }
+
[Test]
public async Task TestUpsertGet()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
index 0edc3f73e..ab082298f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/TablesTests.cs
@@ -33,7 +33,7 @@ namespace Apache.Ignite.Tests.Table
var tables = await Client.Tables.GetTablesAsync();
Assert.AreEqual(1, tables.Count);
- Assert.AreEqual("PUB.TBL1", tables[0].Name);
+ Assert.AreEqual(TableName, tables[0].Name);
}
[Test]
@@ -42,7 +42,7 @@ namespace Apache.Ignite.Tests.Table
var table = await Client.Tables.GetTableAsync(TableName);
Assert.IsNotNull(table);
- Assert.AreEqual("PUB.tbl1", table!.Name);
+ Assert.AreEqual(TableName, table!.Name);
}
[Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 6b5038b16..7d373ced1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -28,6 +28,12 @@ namespace Apache.Ignite.Tests.Transactions
/// </summary>
public class TransactionsTests : IgniteTestsBase
{
+ [TearDown]
+ public async Task CleanTable()
+ {
+ await TupleView.DeleteAllAsync(null, Enumerable.Range(1, 2).Select(x => GetTuple(x)));
+ }
+
[Test]
public async Task TestRecordViewBinaryOperations()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
index 1c748b7b1..2b34b529c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientErrorCode.cs
@@ -35,6 +35,11 @@ namespace Apache.Ignite
/// <summary>
/// Authentication or authorization failure.
/// </summary>
- AuthFailed = 2
+ AuthFailed = 2,
+
+ /// <summary>
+ /// Table id does not exist.
+ /// </summary>
+ TableIdDoesNotExist = 3
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 09b0edd15..8c893bd6d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Compute
using System.Collections.Generic;
using System.Threading.Tasks;
using Network;
+ using Table;
/// <summary>
/// Ignite Compute API provides distributed job execution functionality.
@@ -36,6 +37,30 @@ namespace Apache.Ignite.Compute
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args);
+ /// <summary>
+ /// Executes a job represented by the given class on one node where the given key is located.
+ /// </summary>
+ /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
+ /// <param name="key">Table key to be used to determine the target node for job execution.</param>
+ /// <param name="jobClassName">Java class name of the job to execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
+ Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args);
+
+ /// <summary>
+ /// Executes a job represented by the given class on one node where the given key is located.
+ /// </summary>
+ /// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
+ /// <param name="key">Table key to be used to determine the target node for job execution.</param>
+ /// <param name="jobClassName">Java class name of the job to execute.</param>
+ /// <param name="args">Job arguments.</param>
+ /// <typeparam name="T">Job result type.</typeparam>
+ /// <typeparam name="TKey">Key type.</typeparam>
+ /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
+ Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args)
+ where TKey : class; // TODO: Remove class constraint (IGNITE-16355)
+
/// <summary>
/// Executes a compute job represented by the given class on all of the specified nodes.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
index 026262f38..29d44989d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientException.cs
@@ -29,9 +29,6 @@ namespace Apache.Ignite
/** Error code field. */
private const string ErrorCodeField = "StatusCode";
- /** Error code. */
- private readonly ClientErrorCode _errorCode;
-
/// <summary>
/// Initializes a new instance of the <see cref="IgniteClientException"/> class.
/// </summary>
@@ -70,7 +67,7 @@ namespace Apache.Ignite
public IgniteClientException(string message, Exception? cause, ClientErrorCode statusCode)
: base(message, cause)
{
- _errorCode = statusCode;
+ ErrorCode = statusCode;
}
/// <summary>
@@ -81,9 +78,14 @@ namespace Apache.Ignite
protected IgniteClientException(SerializationInfo info, StreamingContext ctx)
: base(info, ctx)
{
- _errorCode = (ClientErrorCode) info.GetInt32(ErrorCodeField);
+ ErrorCode = (ClientErrorCode)info.GetInt32(ErrorCodeField);
}
+ /// <summary>
+ /// Gets the error code.
+ /// </summary>
+ public ClientErrorCode ErrorCode { get; }
+
/// <summary>
/// When overridden in a derived class, sets the <see cref="SerializationInfo" />
/// with information about the exception.
@@ -96,7 +98,7 @@ namespace Apache.Ignite
{
base.GetObjectData(info, context);
- info.AddValue(ErrorCodeField, (int) _errorCode);
+ info.AddValue(ErrorCodeField, (int) ErrorCode);
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 248818b34..0cf47f6ff 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -17,6 +17,8 @@
namespace Apache.Ignite.Internal.Compute
{
+ using System;
+ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
@@ -24,7 +26,10 @@ namespace Apache.Ignite.Internal.Compute
using Common;
using Ignite.Compute;
using Ignite.Network;
+ using Ignite.Table;
using Proto;
+ using Table;
+ using Table.Serialization;
/// <summary>
/// Compute API.
@@ -34,13 +39,21 @@ namespace Apache.Ignite.Internal.Compute
/** Socket. */
private readonly ClientFailoverSocket _socket;
+ /** Tables. */
+ private readonly Tables _tables;
+
+ /** Cached tables. */
+ private readonly ConcurrentDictionary<string, Table> _tableCache = new();
+
/// <summary>
/// Initializes a new instance of the <see cref="Compute"/> class.
/// </summary>
/// <param name="socket">Socket.</param>
- public Compute(ClientFailoverSocket socket)
+ /// <param name="tables">Tables.</param>
+ public Compute(ClientFailoverSocket socket, Tables tables)
{
_socket = socket;
+ _tables = tables;
}
/// <inheritdoc/>
@@ -52,6 +65,27 @@ namespace Apache.Ignite.Internal.Compute
return await ExecuteOnOneNode<T>(GetRandomNode(nodes), jobClassName, args).ConfigureAwait(false);
}
+ /// <inheritdoc/>
+ public async Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args) =>
+ await ExecuteColocatedAsync<T, IIgniteTuple>(
+ tableName,
+ key,
+ serializerHandlerFunc: _ => TupleSerializerHandler.Instance,
+ jobClassName,
+ args)
+ .ConfigureAwait(false);
+
+ /// <inheritdoc/>
+ public async Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args)
+ where TKey : class =>
+ await ExecuteColocatedAsync<T, TKey>(
+ tableName,
+ key,
+ serializerHandlerFunc: table => table.GetRecordViewInternal<TKey>().RecordSerializer.Handler,
+ jobClassName,
+ args)
+ .ConfigureAwait(false);
+
/// <inheritdoc/>
public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args)
{
@@ -82,7 +116,7 @@ namespace Apache.Ignite.Internal.Compute
}
private static ICollection<IClusterNode> GetNodesCollection(IEnumerable<IClusterNode> nodes) =>
- nodes is ICollection<IClusterNode> col ? col : nodes.ToList();
+ nodes as ICollection<IClusterNode> ?? nodes.ToList();
private async Task<T> ExecuteOnOneNode<T>(IClusterNode node, string jobClassName, object[] args)
{
@@ -138,5 +172,86 @@ namespace Apache.Ignite.Internal.Compute
return (T)reader.ReadObjectWithType()!;
}
}
+
+ private async Task<Table> GetTableAsync(string tableName)
+ {
+ if (_tableCache.TryGetValue(tableName, out var cachedTable))
+ {
+ return cachedTable;
+ }
+
+ var table = await _tables.GetTableInternalAsync(tableName).ConfigureAwait(false);
+
+ if (table != null)
+ {
+ _tableCache[tableName] = table;
+ return table;
+ }
+
+ _tableCache.TryRemove(tableName, out _);
+
+ throw new IgniteClientException($"Table '{tableName}' does not exist.");
+ }
+
+ private async Task<T> ExecuteColocatedAsync<T, TKey>(
+ string tableName,
+ TKey key,
+ Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
+ string jobClassName,
+ params object[] args)
+ where TKey : class
+ {
+ // TODO: IGNITE-16990 - implement partition awareness.
+ IgniteArgumentCheck.NotNull(tableName, nameof(tableName));
+ IgniteArgumentCheck.NotNull(key, nameof(key));
+ IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName));
+
+ while (true)
+ {
+ var table = await GetTableAsync(tableName).ConfigureAwait(false);
+ var schema = await table.GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var bufferWriter = Write(table, schema);
+
+ try
+ {
+ using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, bufferWriter).ConfigureAwait(false);
+
+ return Read(res);
+ }
+ catch (IgniteClientException e) when (e.ErrorCode == ClientErrorCode.TableIdDoesNotExist)
+ {
+ // Table was dropped - remove from cache.
+ // Try again in case a new table with the same name exists.
+ _tableCache.TryRemove(tableName, out _);
+ }
+ }
+
+ PooledArrayBufferWriter Write(Table table, Schema schema)
+ {
+ var bufferWriter = new PooledArrayBufferWriter();
+ var w = bufferWriter.GetMessageWriter();
+
+ w.Write(table.Id);
+ w.Write(schema.Version);
+
+ var serializerHandler = serializerHandlerFunc(table);
+ serializerHandler.Write(ref w, schema, key, true);
+
+ w.Write(jobClassName);
+ w.WriteObjectArrayWithTypes(args);
+
+ w.Flush();
+
+ return bufferWriter;
+ }
+
+ static T Read(in PooledBuffer buf)
+ {
+ var reader = buf.GetReader();
+
+ return (T)reader.ReadObjectWithType()!;
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 20dededa1..401ebe8d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -44,9 +44,13 @@ namespace Apache.Ignite.Internal
public IgniteClientInternal(ClientFailoverSocket socket)
{
_socket = socket;
- Tables = new Tables(socket);
+
+ var tables = new Tables(socket);
+ Tables = tables;
+
Transactions = new Transactions.Transactions(socket);
- Compute = new Compute.Compute(socket);
+
+ Compute = new Compute.Compute(socket, tables);
}
/// <inheritdoc/>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index b00eaa17b..72e89b0e9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -92,6 +92,9 @@ namespace Apache.Ignite.Internal.Proto
ComputeExecute = 47,
/** Get cluster nodes. */
- ClusterGetNodes = 48
+ ClusterGetNodes = 48,
+
+ /** Execute compute job. */
+ ComputeExecuteColocated = 49
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 714a3fa06..c1d65d717 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -52,6 +52,7 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.TupleDeleteAllExact => ClientOperationType.TupleDeleteAllExact,
ClientOp.TupleGetAndDelete => ClientOperationType.TupleGetAndDelete,
ClientOp.ComputeExecute => ClientOperationType.ComputeExecute,
+ ClientOp.ComputeExecuteColocated => ClientOperationType.ComputeExecute,
ClientOp.TxBegin => null,
ClientOp.TxCommit => null,
ClientOp.TxRollback => null,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 5f1edea60..71e568083 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -52,6 +52,11 @@ namespace Apache.Ignite.Internal.Table
_ser = ser;
}
+ /// <summary>
+ /// Gets the record serializer.
+ /// </summary>
+ public RecordSerializer<T> RecordSerializer => _ser;
+
/// <inheritdoc/>
public async Task<T?> GetAsync(ITransaction? transaction, T key)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index f30a57d5b..ee0970ee2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -21,6 +21,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
using System.Collections.Generic;
using Buffers;
using MessagePack;
+ using Proto;
/// <summary>
/// Generic record serializer.
@@ -47,6 +48,11 @@ namespace Apache.Ignite.Internal.Table.Serialization
_handler = handler;
}
+ /// <summary>
+ /// Gets the handler.
+ /// </summary>
+ public IRecordSerializerHandler<T> Handler => _handler;
+
/// <summary>
/// Reads the value part.
/// </summary>
@@ -195,7 +201,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
{
var w = buf.GetMessageWriter();
- _table.WriteIdAndTx(ref w, tx);
+ WriteIdAndTx(ref w, tx);
w.Write(schema.Version);
w.Flush();
@@ -236,10 +242,29 @@ namespace Apache.Ignite.Internal.Table.Serialization
T rec,
bool keyOnly = false)
{
- _table.WriteIdAndTx(ref w, tx);
+ WriteIdAndTx(ref w, tx);
w.Write(schema.Version);
_handler.Write(ref w, schema, rec, keyOnly);
}
+
+ /// <summary>
+ /// Writes table id and transaction id, if present.
+ /// </summary>
+ /// <param name="w">Writer.</param>
+ /// <param name="tx">Transaction.</param>
+ private void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx)
+ {
+ w.Write(_table.Id);
+
+ if (tx == null)
+ {
+ w.WriteNil();
+ }
+ else
+ {
+ w.Write(tx.Id);
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 943c2d426..ff8392350 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -35,9 +35,6 @@ namespace Apache.Ignite.Internal.Table
/** Socket. */
private readonly ClientFailoverSocket _socket;
- /** Table id. */
- private readonly Guid _id;
-
/** Schemas. */
private readonly ConcurrentDictionary<int, Schema> _schemas = new();
@@ -60,7 +57,7 @@ namespace Apache.Ignite.Internal.Table
{
_socket = socket;
Name = name;
- _id = id;
+ Id = id;
RecordBinaryView = new RecordView<IIgniteTuple>(
this,
@@ -78,35 +75,30 @@ namespace Apache.Ignite.Internal.Table
/// </summary>
internal ClientFailoverSocket Socket => _socket;
+ /// <summary>
+ /// Gets the table id.
+ /// </summary>
+ internal Guid Id { get; }
+
/// <inheritdoc/>
public IRecordView<T> GetRecordView<T>()
+ where T : class =>
+ GetRecordViewInternal<T>();
+
+ /// <summary>
+ /// Gets the record view for the specified type.
+ /// </summary>
+ /// <typeparam name="T">Record type.</typeparam>
+ /// <returns>Record view.</returns>
+ internal RecordView<T> GetRecordViewInternal<T>()
where T : class
{
// ReSharper disable once HeapView.CanAvoidClosure (generics prevent this)
- return (IRecordView<T>)_recordViews.GetOrAdd(
+ return (RecordView<T>)_recordViews.GetOrAdd(
typeof(T),
_ => new RecordView<T>(this, new RecordSerializer<T>(this, new ObjectSerializerHandler<T>())));
}
- /// <summary>
- /// Writes the transaction id, if present.
- /// </summary>
- /// <param name="w">Writer.</param>
- /// <param name="tx">Transaction.</param>
- internal void WriteIdAndTx(ref MessagePackWriter w, Transactions.Transaction? tx)
- {
- w.Write(_id);
-
- if (tx == null)
- {
- w.WriteNil();
- }
- else
- {
- w.Write(tx.Id);
- }
- }
-
/// <summary>
/// Reads the schema.
/// </summary>
@@ -168,7 +160,7 @@ namespace Apache.Ignite.Internal.Table
void Write()
{
var w = writer.GetMessageWriter();
- w.Write(_id);
+ w.Write(Id);
if (version == null)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
index b8392099b..ef8ddd8f0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
@@ -35,8 +35,8 @@ namespace Apache.Ignite.Internal.Table
/** Socket. */
private readonly ClientFailoverSocket _socket;
- /** Cached tables. */
- private readonly ConcurrentDictionary<Guid, ITable> _tables = new();
+ /** Cached tables. Caching here is required to retain schema and serializer caches in <see cref="Table"/>. */
+ private readonly ConcurrentDictionary<Guid, Table> _tables = new();
/// <summary>
/// Initializes a new instance of the <see cref="Tables"/> class.
@@ -50,28 +50,7 @@ namespace Apache.Ignite.Internal.Table
/// <inheritdoc/>
public async Task<ITable?> GetTableAsync(string name)
{
- IgniteArgumentCheck.NotNull(name, nameof(name));
-
- using var writer = new PooledArrayBufferWriter();
- Write(writer.GetMessageWriter());
-
- using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false);
- return Read(resBuf.GetReader());
-
- void Write(MessagePackWriter w)
- {
- w.Write(name);
- w.Flush();
- }
-
- // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+)
- ITable? Read(MessagePackReader r) =>
- r.NextMessagePackType == MessagePackType.Nil
- ? null
- : _tables.GetOrAdd(
- r.ReadGuid(),
- (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket),
- (name, _socket));
+ return await GetTableInternalAsync(name).ConfigureAwait(false);
}
/// <inheritdoc/>
@@ -90,11 +69,49 @@ namespace Apache.Ignite.Internal.Table
{
var id = r.ReadGuid();
var name = r.ReadString();
- res.Add(new Table(name, id, _socket));
+
+ // ReSharper disable once LambdaExpressionMustBeStatic (not supported by .NET Core 3.1, TODO IGNITE-16994)
+ var table = _tables.GetOrAdd(
+ id,
+ (Guid id0, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id0, arg.Socket),
+ (name, _socket));
+
+ res.Add(table);
}
return res;
}
}
+
+ /// <summary>
+ /// Gets the table by name.
+ /// </summary>
+ /// <param name="name">Name.</param>
+ /// <returns>Table.</returns>
+ internal async Task<Table?> GetTableInternalAsync(string name)
+ {
+ IgniteArgumentCheck.NotNull(name, nameof(name));
+
+ using var writer = new PooledArrayBufferWriter();
+ Write(writer.GetMessageWriter());
+
+ using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false);
+ return Read(resBuf.GetReader());
+
+ void Write(MessagePackWriter w)
+ {
+ w.Write(name);
+ w.Flush();
+ }
+
+ // ReSharper disable once LambdaExpressionMustBeStatic (requires .NET 5+)
+ Table? Read(MessagePackReader r) =>
+ r.NextMessagePackType == MessagePackType.Nil
+ ? null
+ : _tables.GetOrAdd(
+ r.ReadGuid(),
+ (Guid id, (string Name, ClientFailoverSocket Socket) arg) => new Table(arg.Name, id, arg.Socket),
+ (name, _socket));
+ }
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index a0acb8e5c..4e14c9e8e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
/**
* Helper class for non-Java platform tests (.NET, C++, Python, ...). Starts nodes, populates tables and data for tests.
@@ -150,4 +153,39 @@ public class PlatformTestNodeRunner {
private static int getPort(IgniteImpl node) {
return node.clientAddress().port();
}
+
+ /**
+ * Compute job that creates a table.
+ */
+ @SuppressWarnings({"unused"}) // Used by platform tests.
+ private static class CreateTableJob implements ComputeJob<String> {
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ String tableName = (String) args[0];
+
+ Table table = context.ignite().tables().createTable(
+ tableName,
+ tblChanger -> tblChanger
+ .changeColumns(cols ->
+ cols.create("key", col -> col.changeType(t -> t.changeType("INT64")).changeNullable(false)))
+ .changePrimaryKey(pk -> pk.changeColumns("key").changeColocationColumns("key"))
+ );
+
+ return table.name();
+ }
+ }
+
+ /**
+ * Compute job that drops a table.
+ */
+ @SuppressWarnings({"unused"}) // Used by platform tests.
+ private static class DropTableJob implements ComputeJob<String> {
+ @Override
+ public String execute(JobExecutionContext context, Object... args) {
+ String tableName = (String) args[0];
+ context.ignite().tables().dropTable(tableName);
+
+ return tableName;
+ }
+ }
}