You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/09/05 07:37:59 UTC
[ignite-3] branch main updated: IGNITE-17593 .NET: Thin 3.0: Implement BinaryTuple serialization (#1051)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3cea078225 IGNITE-17593 .NET: Thin 3.0: Implement BinaryTuple serialization (#1051)
3cea078225 is described below
commit 3cea078225b107a6b75d4d06973d362d6a973a50
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Sep 5 10:37:55 2022 +0300
IGNITE-17593 .NET: Thin 3.0: Implement BinaryTuple serialization (#1051)
* Port `BinaryTupleBuilder` and `BinaryTupleReader` from Java to .NET.
* Refactor `PooledArrayBufferWriter` to allow variable size prefix.
* Change UUID part order in `BinaryTuple` to be consistent with other implementations (client, BinaryRow).
---
.../internal/binarytuple/BinaryTupleBuilder.java | 2 +-
.../internal/binarytuple/BinaryTupleParser.java | 4 +-
.../SerializerHandlerBenchmarksBase.cs | 2 +-
.../Buffers/PooledArrayBufferWriterTests.cs | 2 +-
.../Apache.Ignite.Tests/ClientSocketTests.cs | 4 +-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 6 +-
.../Proto/BinaryTuple/BinaryTupleTests.cs | 356 ++++++++++++++++++
.../Proto/MessagePackExtensionsTest.cs | 4 +-
.../Serialization/ObjectSerializerHandlerTests.cs | 7 +-
.../dotnet/Apache.Ignite/Apache.Ignite.csproj | 1 -
.../Internal/Buffers/PooledArrayBufferWriter.cs | 159 ++++++--
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 36 +-
.../Apache.Ignite/Internal/Compute/Compute.cs | 6 +-
.../Proto/BinaryTuple/BinaryTupleBuilder.cs | 405 +++++++++++++++++++++
.../Proto/BinaryTuple/BinaryTupleCommon.cs | 105 ++++++
.../Proto/BinaryTuple/BinaryTupleReader.cs | 244 +++++++++++++
.../Internal/Proto/MessagePackReaderExtensions.cs | 16 +-
.../Internal/Proto/MessagePackWriterExtensions.cs | 29 +-
.../Apache.Ignite/Internal/Proto/ProtoCommon.cs | 13 +
.../Apache.Ignite/Internal/Proto/UuidSerializer.cs | 74 ++++
.../dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs | 4 +-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 2 +-
.../Apache.Ignite/Internal/Table/RecordView.cs | 14 +-
.../Table/Serialization/RecordSerializer.cs | 7 +-
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 2 +-
.../dotnet/Apache.Ignite/Internal/Table/Tables.cs | 3 +-
.../Internal/Transactions/Transaction.cs | 5 +-
27 files changed, 1369 insertions(+), 143 deletions(-)
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index 60825b3aad..49737f650e 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -390,8 +390,8 @@ public class BinaryTupleBuilder {
long lsb = value.getLeastSignificantBits();
long msb = value.getMostSignificantBits();
if ((lsb | msb) != 0L) {
- putLong(lsb);
putLong(msb);
+ putLong(lsb);
}
return proceed();
}
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
index 0ace31f197..534ee52da7 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleParser.java
@@ -372,8 +372,8 @@ public class BinaryTupleParser {
}
throw new BinaryTupleFormatException("Invalid length for a tuple element");
}
- long lsb = buffer.getLong(begin);
- long msb = buffer.getLong(begin + 8);
+ long msb = buffer.getLong(begin);
+ long lsb = buffer.getLong(begin + 8);
return new UUID(msb, lsb);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
index 022a23f53f..13c3a04abe 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Benchmarks/Table/Serialization/SerializerHandlerBenchmarksBase.cs
@@ -77,7 +77,7 @@ namespace Apache.Ignite.Benchmarks.Table.Serialization
TupleSerializerHandler.Instance.Write(ref writer, Schema, Tuple);
writer.Flush();
- return pooledWriter.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize).ToArray();
+ return pooledWriter.GetWrittenMemory().ToArray();
}
protected internal class Car
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
index b281748cd3..deb09783eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Buffers/PooledArrayBufferWriterTests.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Tests.Buffers
writer.Write("A");
writer.Flush();
- var res = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..].ToArray();
+ var res = bufferWriter.GetWrittenMemory().ToArray();
CollectionAssert.AreEqual(new byte[] { 1, 0xa1, (byte)'A' }, res);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
index f6a0d09bb8..e845884025 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ClientSocketTests.cs
@@ -37,7 +37,7 @@ namespace Apache.Ignite.Tests
{
using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort), new());
- using var requestWriter = new PooledArrayBufferWriter();
+ using var requestWriter = ProtoCommon.GetMessageWriter();
WriteString(requestWriter.GetMessageWriter(), "non-existent-table");
@@ -56,7 +56,7 @@ namespace Apache.Ignite.Tests
{
using var socket = await ClientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, ServerPort), new());
- using var requestWriter = new PooledArrayBufferWriter();
+ using var requestWriter = ProtoCommon.GetMessageWriter();
requestWriter.GetMessageWriter().Write(123);
var ex = Assert.ThrowsAsync<IgniteClientException>(
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 24794213c4..ab20970e67 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -122,7 +122,7 @@ namespace Apache.Ignite.Tests
}
private static void Send(Socket socket, long requestId, PooledArrayBufferWriter writer, bool isError = false)
- => Send(socket, requestId, writer.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize), isError);
+ => Send(socket, requestId, writer.GetWrittenMemory(), isError);
private static void Send(Socket socket, long requestId, ReadOnlyMemory<byte> payload, bool isError = false)
{
@@ -139,7 +139,7 @@ namespace Apache.Ignite.Tests
writer.Flush();
- var headerMem = header.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize);
+ var headerMem = header.GetWrittenMemory();
var size = BitConverter.GetBytes(IPAddress.HostToNetworkOrder(headerMem.Length + payload.Length));
socket.Send(size);
@@ -286,7 +286,7 @@ namespace Apache.Ignite.Tests
handshakeWriter.WriteMapHeader(0); // Extensions.
handshakeWriter.Flush();
- var handshakeMem = handshakeBufferWriter.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize);
+ var handshakeMem = handshakeBufferWriter.GetWrittenMemory();
handler.Send(new byte[] { 0, 0, 0, (byte)handshakeMem.Length }); // Size.
handler.Send(handshakeMem.Span);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
new file mode 100644
index 0000000000..4e60f61b27
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/BinaryTuple/BinaryTupleTests.cs
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests.Proto.BinaryTuple
+{
+ using System;
+ using System.Linq;
+ using Internal.Proto.BinaryTuple;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests for binary tuple.
+ /// </summary>
+ public class BinaryTupleTests
+ {
+ [Test]
+ public void TestNullValue()
+ {
+ // Header: 1 byte with null map flag.
+ // NullMap: 1 byte with first bit set.
+ // Offset table: 1 zero byte
+ byte[] bytes = { BinaryTupleCommon.NullmapFlag, 1, 0 };
+ var reader = new BinaryTupleReader(bytes, 1);
+
+ Assert.IsTrue(reader.HasNullMap);
+ Assert.IsTrue(reader.IsNull(0));
+ }
+
+ [Test]
+ public void TestGetValueThrowsOnNull()
+ {
+ var reader = BuildAndRead(b => b.AppendNull());
+
+ var getters = new Action<BinaryTupleReader>[]
+ {
+ x => x.GetString(0),
+ x => x.GetByte(0),
+ x => x.GetShort(0),
+ x => x.GetInt(0),
+ x => x.GetLong(0),
+ x => x.GetFloat(0),
+ x => x.GetDouble(0),
+ x => x.GetGuid(0)
+ };
+
+ foreach (var getter in getters)
+ {
+ var ex = Assert.Throws<InvalidOperationException>(() => getter(reader));
+ Assert.AreEqual("Binary tuple element with index 0 is null.", ex!.Message);
+ }
+ }
+
+ [Test]
+ public void TestAppendNull()
+ {
+ var reader = BuildAndRead(b => b.AppendNull());
+
+ Assert.IsTrue(reader.HasNullMap);
+ Assert.IsTrue(reader.IsNull(0));
+ }
+
+ [Test]
+ public void TestDefaultValue()
+ {
+ // Header: 1 zero byte.
+ // Offset table: 1 zero byte.
+ byte[] bytes1 = { 0, 0 };
+
+ // Header: 1 byte with null map flag.
+ // NullMap: 1 byte with no bit set.
+ // Offset table: 1 zero byte
+ byte[] bytes2 = { BinaryTupleCommon.NullmapFlag, 0, 0 };
+
+ byte[][] bytesArray = { bytes1, bytes2 };
+
+ foreach (var bytes in bytesArray)
+ {
+ var reader = new BinaryTupleReader(bytes, 1);
+
+ if (bytes.Length == bytes1.Length)
+ {
+ Assert.IsFalse(reader.HasNullMap);
+ }
+ else
+ {
+ Assert.IsTrue(reader.HasNullMap);
+ }
+
+ Assert.IsFalse(reader.IsNull(0));
+ Assert.AreEqual(string.Empty, reader.GetString(0));
+ Assert.AreEqual(Guid.Empty, reader.GetGuid(0));
+ Assert.AreEqual(0, reader.GetByte(0));
+ Assert.AreEqual(0, reader.GetShort(0));
+ Assert.AreEqual(0, reader.GetInt(0));
+ Assert.AreEqual(0L, reader.GetLong(0));
+ }
+ }
+
+ [Test]
+ public void TestByte([Values(0, 1, sbyte.MaxValue, sbyte.MinValue)] sbyte value)
+ {
+ var res = Build(b => b.AppendByte(value));
+
+ Assert.AreEqual(value != 0 ? 1 : 0, res[1]);
+ Assert.AreEqual(value != 0 ? 3 : 2, res.Length);
+
+ var reader = new BinaryTupleReader(res, 1);
+ Assert.AreEqual(value, reader.GetByte(0));
+ }
+
+ [Test]
+ public void TestShort()
+ {
+ short[] values = {sbyte.MinValue, -1, 0, 1, sbyte.MaxValue};
+
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendShort(value));
+
+ Assert.AreEqual(value != 0 ? 1 : 0, bytes[1]);
+ Assert.AreEqual(value != 0 ? 3 : 2, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetShort(0));
+ }
+
+ values = new short[] { short.MinValue, sbyte.MinValue - 1, sbyte.MaxValue + 1, short.MaxValue };
+
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendShort(value));
+
+ Assert.AreEqual(2, bytes[1]);
+ Assert.AreEqual(4, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetShort(0));
+ }
+ }
+
+ [Test]
+ public void TestInt()
+ {
+ int[] values = { sbyte.MinValue, -1, 0, 1, sbyte.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendInt(value));
+
+ Assert.AreEqual(value != 0 ? 1 : 0, bytes[1]);
+ Assert.AreEqual(value != 0 ? 3 : 2, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetInt(0));
+ }
+
+ values = new[] { short.MinValue, sbyte.MinValue - 1, sbyte.MaxValue + 1, short.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendInt(value));
+
+ Assert.AreEqual(2, bytes[1]);
+ Assert.AreEqual(4, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetInt(0));
+ }
+
+ values = new[] { int.MinValue, short.MinValue - 1, short.MaxValue + 1, int.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendInt(value));
+
+ Assert.AreEqual(4, bytes[1]);
+ Assert.AreEqual(6, bytes.Length);
+
+ BinaryTupleReader reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetInt(0));
+ }
+ }
+
+ [Test]
+ public void TestLong()
+ {
+ long[] values = { sbyte.MinValue, -1, 0, 1, sbyte.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendLong(value));
+
+ Assert.AreEqual(value != 0 ? 1 : 0, bytes[1]);
+ Assert.AreEqual(value != 0 ? 3 : 2, bytes.Length);
+
+ BinaryTupleReader reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetLong(0));
+ }
+
+ values = new long[] { short.MinValue, sbyte.MinValue - 1, sbyte.MaxValue + 1, short.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendLong(value));
+
+ Assert.AreEqual(2, bytes[1]);
+ Assert.AreEqual(4, bytes.Length);
+
+ BinaryTupleReader reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetLong(0));
+ }
+
+ values = new long[] { int.MinValue, short.MinValue - 1, short.MaxValue + 1, int.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendLong(value));
+
+ Assert.AreEqual(4, bytes[1]);
+ Assert.AreEqual(6, bytes.Length);
+
+ BinaryTupleReader reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetLong(0));
+ }
+
+ values = new[] { long.MinValue, int.MinValue - 1L, int.MaxValue + 1L, long.MaxValue };
+ foreach (var value in values)
+ {
+ var bytes = Build(b => b.AppendLong(value));
+
+ Assert.AreEqual(8, bytes[1]);
+ Assert.AreEqual(10, bytes.Length);
+
+ BinaryTupleReader reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetLong(0));
+ }
+ }
+
+ [Test]
+ public void TestFloat()
+ {
+ {
+ float value = 0.0F;
+ var bytes = Build(b => b.AppendFloat(value));
+
+ Assert.AreEqual(0, bytes[1]);
+ Assert.AreEqual(2, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetFloat(0));
+ }
+
+ {
+ float value = 0.5F;
+ var bytes = Build(b => b.AppendFloat(value));
+
+ Assert.AreEqual(4, bytes[1]);
+ Assert.AreEqual(6, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetFloat(0));
+ }
+ }
+
+ [Test]
+ public void TestDouble()
+ {
+ {
+ double value = 0.0;
+ var bytes = Build(b => b.AppendDouble(value));
+
+ Assert.AreEqual(0, bytes[1]);
+ Assert.AreEqual(2, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetDouble(0));
+ }
+
+ {
+ double value = 0.5;
+ var bytes = Build(b => b.AppendDouble(value));
+
+ Assert.AreEqual(4, bytes[1]);
+ Assert.AreEqual(6, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetDouble(0));
+ }
+
+ {
+ double value = 0.1;
+ var bytes = Build(b => b.AppendDouble(value));
+
+ Assert.AreEqual(8, bytes[1]);
+ Assert.AreEqual(10, bytes.Length);
+
+ var reader = new BinaryTupleReader(bytes, 1);
+ Assert.AreEqual(value, reader.GetDouble(0));
+ }
+ }
+
+ [Test]
+ public void TestString()
+ {
+ var values = new[] {"ascii", "我愛Java", string.Empty, "a string with a bit more characters"};
+
+ var reader = BuildAndRead(b => values.ToList().ForEach(b.AppendString), numElements: values.Length);
+
+ for (var i = 0; i < values.Length; i++)
+ {
+ Assert.AreEqual(values[i], reader.GetString(i));
+ }
+ }
+
+ [Test]
+ public void TestGuid()
+ {
+ var guid = Guid.NewGuid();
+
+ var reader = BuildAndRead(
+ b =>
+ {
+ b.AppendGuid(Guid.Empty);
+ b.AppendGuid(guid);
+ },
+ numElements: 2);
+
+ Assert.AreEqual(Guid.Empty, reader.GetGuid(0));
+ Assert.AreEqual(guid, reader.GetGuid(1));
+ }
+
+ private static BinaryTupleReader BuildAndRead(Action<BinaryTupleBuilder> build, int numElements = 1)
+ {
+ var bytes = Build(build, numElements);
+
+ return new BinaryTupleReader(bytes, numElements);
+ }
+
+ private static byte[] Build(Action<BinaryTupleBuilder> build, int numElements = 1)
+ {
+ using var builder = new BinaryTupleBuilder(numElements);
+
+ build(builder);
+
+ return builder.Build().ToArray();
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
index c0096a3c07..1d036a3cb0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MessagePackExtensionsTest.cs
@@ -118,7 +118,7 @@ namespace Apache.Ignite.Tests.Proto
writer.Write(Guid.Parse(JavaUuidString));
writer.Flush();
- var bytes = bufferWriter.GetWrittenMemory()[PooledArrayBufferWriter.ReservedPrefixSize..]
+ var bytes = bufferWriter.GetWrittenMemory()
.ToArray()
.Select(b => (sbyte) b)
.ToArray();
@@ -198,7 +198,7 @@ namespace Apache.Ignite.Tests.Proto
var bufferWriter = new PooledArrayBufferWriter();
write(bufferWriter);
- var mem = bufferWriter.GetWrittenMemory().Slice(PooledArrayBufferWriter.ReservedPrefixSize);
+ var mem = bufferWriter.GetWrittenMemory();
return read(mem);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
index b00a6b3798..705f277d3e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/Serialization/ObjectSerializerHandlerTests.cs
@@ -51,9 +51,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
public void TestWriteUnsigned()
{
var bytes = Write(new UnsignedPoco(ulong.MaxValue, "foo"));
-
- var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
- var reader = new MessagePackReader(resMem);
+ var reader = new MessagePackReader(bytes);
Assert.AreEqual(ulong.MaxValue, reader.ReadUInt64());
Assert.AreEqual("foo", reader.ReadString());
@@ -130,8 +128,7 @@ namespace Apache.Ignite.Tests.Table.Serialization
{
var bytes = Write(new Poco { Key = 1234, Val = "foo" }, keyOnly);
- var resMem = bytes[PooledArrayBufferWriter.ReservedPrefixSize..]; // Skip length header.
- return new MessagePackReader(resMem);
+ return new MessagePackReader(bytes);
}
private static byte[] Write<T>(T obj, bool keyOnly = false)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Apache.Ignite.csproj b/modules/platforms/dotnet/Apache.Ignite/Apache.Ignite.csproj
index 954b02d1fa..6f6466b9ea 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Apache.Ignite.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite/Apache.Ignite.csproj
@@ -21,7 +21,6 @@
<TargetFramework>netstandard2.1</TargetFramework>
<CodeAnalysisRuleSet>..\Apache.Ignite.ruleset</CodeAnalysisRuleSet>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
- <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>Apache.Ignite.snk</AssemblyOriginatorKeyFile>
<IsPackable>true</IsPackable>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
index 8a76f1941c..79e36eaf98 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBufferWriter.cs
@@ -19,8 +19,8 @@ namespace Apache.Ignite.Internal.Buffers
{
using System;
using System.Buffers;
+ using System.Buffers.Binary;
using System.Diagnostics;
- using System.Net;
using MessagePack;
/// <summary>
@@ -40,10 +40,8 @@ namespace Apache.Ignite.Internal.Buffers
/// </summary>
internal sealed class PooledArrayBufferWriter : IBufferWriter<byte>, IDisposable
{
- /// <summary>
- /// Reserved prefix size.
- /// </summary>
- public const int ReservedPrefixSize = 4 + 5 + 9; // Size (4 bytes) + OpCode (5 bytes) + RequestId (9 bytes)/
+ /** Prefix size. */
+ private readonly int _prefixSize;
/** Underlying pooled array. */
private byte[] _buffer;
@@ -58,21 +56,28 @@ namespace Apache.Ignite.Internal.Buffers
/// Initializes a new instance of the <see cref="PooledArrayBufferWriter"/> class.
/// </summary>
/// <param name="initialCapacity">Initial capacity.</param>
- public PooledArrayBufferWriter(int initialCapacity = PooledBuffer.DefaultCapacity)
+ /// <param name="prefixSize">Size of the reserved space at the start of the buffer.</param>
+ public PooledArrayBufferWriter(int initialCapacity = PooledBuffer.DefaultCapacity, int prefixSize = 0)
{
// NOTE: Shared pool has 1M elements limit before .NET 6.
// https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#buffering
_buffer = ByteArrayPool.Rent(initialCapacity);
- _index = ReservedPrefixSize;
+ _prefixSize = prefixSize;
+ _index = prefixSize;
}
+ /// <summary>
+ /// Gets the current position.
+ /// </summary>
+ public int Position => _index - _prefixSize;
+
/// <summary>
/// Gets the free capacity.
/// </summary>
private int FreeCapacity => _buffer.Length - _index;
/// <summary>
- /// Gets the written memory, including reserved prefix (see <see cref="ReservedPrefixSize"/>).
+ /// Gets the written memory, including prefix, if any.
/// </summary>
/// <returns>Written array.</returns>
public Memory<byte> GetWrittenMemory()
@@ -85,10 +90,7 @@ namespace Apache.Ignite.Internal.Buffers
/// <inheritdoc />
public void Advance(int count)
{
- if (count < 0)
- {
- throw new ArgumentException(null, nameof(count));
- }
+ Debug.Assert(count >= 0, "count >= 0");
if (_index > _buffer.Length - count)
{
@@ -112,60 +114,141 @@ namespace Apache.Ignite.Internal.Buffers
return _buffer.AsSpan(_index);
}
+ /// <summary>
+ /// Gets a span for writing at the specified position.
+ /// </summary>
+ /// <param name="position">Position.</param>
+ /// <param name="size">Size.</param>
+ /// <returns>Span for writing.</returns>
+ public Span<byte> GetSpan(int position, int size)
+ {
+ var overflow = _prefixSize + position + size - _index;
+
+ if (overflow > 0)
+ {
+ CheckAndResizeBuffer(overflow);
+ }
+
+ return _buffer.AsSpan(_prefixSize + position, size);
+ }
+
/// <summary>
/// Gets the <see cref="MessagePackWriter"/> for this buffer.
/// </summary>
/// <returns><see cref="MessagePackWriter"/> for this buffer.</returns>
public MessagePackWriter GetMessageWriter() => new(this);
+ /// <inheritdoc />
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ ByteArrayPool.Return(_buffer);
+ _disposed = true;
+ }
+ }
+
/// <summary>
- /// Reserves space for an int32 value and returns its position.
+ /// Writes a byte at current position.
/// </summary>
- /// <returns>Reserved int position. To be used with <see cref="WriteInt32"/>.</returns>
- public int ReserveInt32()
+ /// <param name="val">Value.</param>
+ public void WriteByte(byte val)
{
- var pos = _index;
+ CheckAndResizeBuffer(1);
+ _buffer[_index++] = val;
+ }
- Advance(5);
+ /// <summary>
+ /// Writes a byte at specified position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteByte(byte val, int pos)
+ {
+ _buffer[pos + _prefixSize] = val;
+ }
- return pos;
+ /// <summary>
+ /// Writes a short at current position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ public void WriteShort(short val)
+ {
+ CheckAndResizeBuffer(2);
+ BinaryPrimitives.WriteInt16LittleEndian(_buffer.AsSpan(_index), val);
+ _index += 2;
}
/// <summary>
- /// Writes an int32 value at the given position. Intended to be used with <see cref="ReserveInt32"/>.
+ /// Writes a short at specified position.
/// </summary>
- /// <param name="position">Position.</param>
- /// <param name="value">Value.</param>
- public unsafe void WriteInt32(int position, int value)
+ /// <param name="val">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteShort(short val, int pos)
{
- fixed (byte* ptr = &_buffer[position + 1])
- {
- *(int*)ptr = IPAddress.HostToNetworkOrder(value);
- }
+ BinaryPrimitives.WriteInt16LittleEndian(_buffer.AsSpan(pos + _prefixSize), val);
+ }
- _buffer[position] = MessagePackCode.Int32;
+ /// <summary>
+ /// Writes an int at current position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ public void WriteInt(int val)
+ {
+ CheckAndResizeBuffer(4);
+ BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(_index), val);
+ _index += 4;
}
- /// <inheritdoc />
- public void Dispose()
+ /// <summary>
+ /// Writes an int at specified position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteInt(int val, int pos)
{
- if (!_disposed)
- {
- ByteArrayPool.Return(_buffer);
- _disposed = true;
- }
+ BinaryPrimitives.WriteInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize), val);
}
+ /// <summary>
+ /// Writes a long at current position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ public void WriteLong(long val)
+ {
+ CheckAndResizeBuffer(8);
+ BinaryPrimitives.WriteInt64LittleEndian(_buffer.AsSpan(_index), val);
+ _index += 8;
+ }
+
+ /// <summary>
+ /// Reads a byte at specified position.
+ /// </summary>
+ /// <param name="pos">Position.</param>
+ /// <returns>Result.</returns>
+ public byte ReadByte(int pos) => _buffer[pos + _prefixSize];
+
+ /// <summary>
+ /// Reads a short at specified position.
+ /// </summary>
+ /// <param name="pos">Position.</param>
+ /// <returns>Result.</returns>
+ public short ReadShort(int pos) => BinaryPrimitives.ReadInt16LittleEndian(_buffer.AsSpan(pos + _prefixSize));
+
+ /// <summary>
+ /// Reads an int at specified position.
+ /// </summary>
+ /// <param name="pos">Position.</param>
+ /// <returns>Result.</returns>
+ public int ReadInt(int pos) => BinaryPrimitives.ReadInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize));
+
/// <summary>
/// Checks underlying buffer and resizes if necessary.
/// </summary>
/// <param name="sizeHint">Size hint.</param>
private void CheckAndResizeBuffer(int sizeHint)
{
- if (sizeHint < 0)
- {
- throw new ArgumentException(null, nameof(sizeHint));
- }
+ Debug.Assert(sizeHint >= 0, "sizeHint >= 0");
if (sizeHint == 0)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 74f36adc17..78cb555984 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Internal
{
using System;
+ using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -80,7 +81,7 @@ namespace Apache.Ignite.Internal
private readonly IIgniteLogger? _logger;
/** Pre-allocated buffer for message size + op code + request id. To be used under <see cref="_sendLock"/>. */
- private readonly byte[] _prefixBuffer = new byte[PooledArrayBufferWriter.ReservedPrefixSize];
+ private readonly byte[] _prefixBuffer = new byte[ProtoCommon.MessagePrefixSize];
/** Request id generator. */
private long _requestId;
@@ -339,7 +340,7 @@ namespace Apache.Ignite.Internal
await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, cancellationToken).ConfigureAwait(false);
- return GetMessageSize(buffer);
+ return ReadMessageSize(buffer);
}
private static async Task ReceiveBytesAsync(
@@ -366,25 +367,15 @@ namespace Apache.Ignite.Internal
}
}
- private static unsafe int GetMessageSize(byte[] responseLenBytes)
- {
- fixed (byte* len = &responseLenBytes[0])
- {
- var messageSize = *(int*)len;
-
- return IPAddress.NetworkToHostOrder(messageSize);
- }
- }
-
private static async ValueTask WriteHandshakeAsync(NetworkStream stream, ClientProtocolVersion version)
{
- using var bufferWriter = new PooledArrayBufferWriter();
+ using var bufferWriter = new PooledArrayBufferWriter(prefixSize: ProtoCommon.MessagePrefixSize);
WriteHandshake(version, bufferWriter.GetMessageWriter());
// Prepend size.
var buf = bufferWriter.GetWrittenMemory();
- var size = buf.Length - PooledArrayBufferWriter.ReservedPrefixSize;
- var resBuf = buf.Slice(PooledArrayBufferWriter.ReservedPrefixSize - 4);
+ var size = buf.Length - ProtoCommon.MessagePrefixSize;
+ var resBuf = buf.Slice(ProtoCommon.MessagePrefixSize - 4);
WriteMessageSize(resBuf, size);
await stream.WriteAsync(resBuf).ConfigureAwait(false);
@@ -405,13 +396,10 @@ namespace Apache.Ignite.Internal
w.Flush();
}
- private static unsafe void WriteMessageSize(Memory<byte> target, int size)
- {
- fixed (byte* bufPtr = target.Span)
- {
- *(int*)bufPtr = IPAddress.HostToNetworkOrder(size);
- }
- }
+ private static void WriteMessageSize(Memory<byte> target, int size) =>
+ BinaryPrimitives.WriteInt32BigEndian(target.Span, size);
+
+ private static int ReadMessageSize(Span<byte> responseLenBytes) => BinaryPrimitives.ReadInt32BigEndian(responseLenBytes);
private static TimeSpan GetHeartbeatInterval(TimeSpan configuredInterval, TimeSpan serverIdleTimeout, IIgniteLogger? logger)
{
@@ -475,10 +463,10 @@ namespace Apache.Ignite.Internal
{
var requestBuf = request.GetWrittenMemory();
- WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - PooledArrayBufferWriter.ReservedPrefixSize);
+ WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - ProtoCommon.MessagePrefixSize);
var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
- var requestBufStart = PooledArrayBufferWriter.ReservedPrefixSize - prefixBytes.Length;
+ var requestBufStart = ProtoCommon.MessagePrefixSize - prefixBytes.Length;
var requestBufWithPrefix = requestBuf.Slice(requestBufStart);
// Copy prefix to request buf to avoid extra WriteAsync call for the prefix.
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 4d9098eb2a..c1db86a743 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -125,7 +125,7 @@ namespace Apache.Ignite.Internal.Compute
// Try direct connection to the specified node.
if (_socket.GetEndpoint(node.Name) is { } endpoint)
{
- using var writerWithoutNode = new PooledArrayBufferWriter();
+ using var writerWithoutNode = ProtoCommon.GetMessageWriter();
Write(writerWithoutNode, writeNode: false);
using var res1 = await _socket.TryDoOutInOpAsync(endpoint, ClientOp.ComputeExecute, writerWithoutNode)
@@ -139,7 +139,7 @@ namespace Apache.Ignite.Internal.Compute
}
// When direct connection is not available, use default connection and pass target node info to the server.
- using var writerWithNode = new PooledArrayBufferWriter();
+ using var writerWithNode = ProtoCommon.GetMessageWriter();
Write(writerWithNode, writeNode: true);
using var res2 = await _socket.DoOutInOpAsync(ClientOp.ComputeExecute, writerWithNode).ConfigureAwait(false);
@@ -230,7 +230,7 @@ namespace Apache.Ignite.Internal.Compute
PooledArrayBufferWriter Write(Table table, Schema schema)
{
- var bufferWriter = new PooledArrayBufferWriter();
+ var bufferWriter = ProtoCommon.GetMessageWriter();
var w = bufferWriter.GetMessageWriter();
w.Write(table.Id);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
new file mode 100644
index 0000000000..1381d05f32
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto.BinaryTuple
+{
+ using System;
+ using System.Diagnostics;
+ using System.Text;
+ using Buffers;
+
+ /// <summary>
+ /// Binary tuple builder.
+ /// </summary>
+ internal sealed class BinaryTupleBuilder : IDisposable // TODO: Support all types (IGNITE-15431).
+ {
+ /** Number of elements in the tuple. */
+ private readonly int _numElements;
+
+ /** Size of an offset table entry. */
+ private readonly int _entrySize;
+
+ /** Position of the varlen offset table. */
+ private readonly int _entryBase;
+
+ /** Starting position of variable-length values. */
+ private readonly int _valueBase;
+
+ /** Buffer for tuple content. */
+ private readonly PooledArrayBufferWriter _buffer = new();
+
+ /** Flag indicating if any NULL values were really put here. */
+ private bool _hasNullValues;
+
+ /** Current element. */
+ private int _elementIndex;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BinaryTupleBuilder"/> class.
+ /// </summary>
+ /// <param name="numElements">Capacity.</param>
+ /// <param name="allowNulls">Whether nulls are allowed.</param>
+ /// <param name="totalValueSize">Total value size, -1 when unknown.</param>
+ public BinaryTupleBuilder(int numElements, bool allowNulls = true, int totalValueSize = -1)
+ {
+ Debug.Assert(numElements > 0, "numElements > 0");
+
+ _numElements = numElements;
+
+ int baseOffset = BinaryTupleCommon.HeaderSize;
+ if (allowNulls)
+ {
+ baseOffset += BinaryTupleCommon.NullMapSize(numElements);
+ }
+
+ _entryBase = baseOffset;
+
+ _entrySize = totalValueSize < 0
+ ? 4
+ : BinaryTupleCommon.FlagsToEntrySize(BinaryTupleCommon.ValueSizeToFlags(totalValueSize));
+
+ _valueBase = baseOffset + _entrySize * numElements;
+
+ _buffer.GetSpan(_valueBase);
+ _buffer.Advance(_valueBase);
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether null map is present.
+ /// </summary>
+ public bool HasNullMap => _entryBase > BinaryTupleCommon.HeaderSize;
+
+ /// <summary>
+ /// Gets the current element index.
+ /// </summary>
+ public int ElementIndex => _elementIndex;
+
+ /// <summary>
+ /// Appends a null value.
+ /// </summary>
+ public void AppendNull()
+ {
+ if (!HasNullMap)
+ {
+ throw new InvalidOperationException("Appending a NULL value in binary tuple builder with disabled NULLs");
+ }
+
+ _hasNullValues = true;
+
+ int nullIndex = BinaryTupleCommon.NullOffset(_elementIndex);
+ byte nullMask = BinaryTupleCommon.NullMask(_elementIndex);
+
+ _buffer.GetSpan(nullIndex, 1)[0] |= nullMask;
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a byte.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendByte(sbyte value)
+ {
+ if (value != 0)
+ {
+ PutByte(value);
+ }
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a short.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendShort(short value)
+ {
+ if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ {
+ AppendByte((sbyte)value);
+ }
+ else
+ {
+ PutShort(value);
+ OnWrite();
+ }
+ }
+
+ /// <summary>
+ /// Appends an int.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendInt(int value)
+ {
+ if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
+ {
+ AppendByte((sbyte)value);
+ return;
+ }
+
+ if (value >= short.MinValue && value <= short.MaxValue)
+ {
+ PutShort((short)value);
+ }
+ else
+ {
+ PutInt(value);
+ }
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a long.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendLong(long value)
+ {
+ if (value >= short.MinValue && value <= short.MaxValue)
+ {
+ AppendShort((short)value);
+ return;
+ }
+
+ if (value >= int.MinValue && value <= int.MaxValue)
+ {
+ PutInt((int)value);
+ }
+ else
+ {
+ PutLong(value);
+ }
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a gloat.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendFloat(float value)
+ {
+ if (value != 0.0F)
+ {
+ PutFloat(value);
+ }
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a double.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendDouble(double value)
+ {
+ // ReSharper disable once CompareOfFloatsByEqualityOperator
+ if (value == (float)value)
+ {
+ AppendFloat((float)value);
+ return;
+ }
+
+ PutDouble(value);
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a string.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendString(string value)
+ {
+ PutString(value);
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends bytes.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendBytes(Span<byte> value)
+ {
+ PutBytes(value);
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Appends a guid.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ public void AppendGuid(Guid value)
+ {
+ if (value != default)
+ {
+ UuidSerializer.Write(value, GetSpan(16));
+ }
+
+ OnWrite();
+ }
+
+ /// <summary>
+ /// Builds the tuple.
+ /// <para />
+ /// NOTE: This should be called only once as it messes up with accumulated internal data.
+ /// </summary>
+ /// <returns>Resulting memory.</returns>
+ public Memory<byte> Build()
+ {
+ int offset = 0;
+
+ int valueSize = _buffer.Position - _valueBase;
+ byte flags = BinaryTupleCommon.ValueSizeToFlags(valueSize);
+ int desiredEntrySize = BinaryTupleCommon.FlagsToEntrySize(flags);
+
+ // Shrink the offset table if needed.
+ if (desiredEntrySize != _entrySize)
+ {
+ if (desiredEntrySize > _entrySize)
+ {
+ throw new InvalidOperationException("Offset entry overflow in binary tuple builder");
+ }
+
+ Debug.Assert(_entrySize == 4 || _entrySize == 2, "_entrySize == 4 || _entrySize == 2");
+ Debug.Assert(desiredEntrySize == 2 || desiredEntrySize == 1, "desiredEntrySize == 2 || desiredEntrySize == 1");
+
+ int getIndex = _valueBase;
+ int putIndex = _valueBase;
+ while (getIndex > _entryBase)
+ {
+ getIndex -= _entrySize;
+ putIndex -= desiredEntrySize;
+
+ var value = _entrySize == 4
+ ? _buffer.ReadInt(getIndex)
+ : _buffer.ReadShort(getIndex);
+
+ if (desiredEntrySize == 1)
+ {
+ _buffer.WriteByte((byte)value, putIndex);
+ }
+ else
+ {
+ _buffer.WriteShort((short)value, putIndex);
+ }
+ }
+
+ offset = (_entrySize - desiredEntrySize) * _numElements;
+ }
+
+ // Drop or move null map if needed.
+ if (HasNullMap)
+ {
+ if (!_hasNullValues)
+ {
+ offset += BinaryTupleCommon.NullMapSize(_numElements);
+ }
+ else
+ {
+ flags |= BinaryTupleCommon.NullmapFlag;
+ if (offset != 0)
+ {
+ int n = BinaryTupleCommon.NullMapSize(_numElements);
+ for (int i = BinaryTupleCommon.HeaderSize + n - 1; i >= BinaryTupleCommon.HeaderSize; i--)
+ {
+ _buffer.WriteByte(_buffer.ReadByte(i), i + offset);
+ }
+ }
+ }
+ }
+
+ _buffer.WriteByte(flags, offset);
+
+ return _buffer.GetWrittenMemory().Slice(offset);
+ }
+
+ /// <inheritdoc/>
+ public void Dispose()
+ {
+ _buffer.Dispose();
+ }
+
+ private void PutByte(sbyte value) => _buffer.WriteByte(unchecked((byte)value));
+
+ private void PutShort(short value) => _buffer.WriteShort(value);
+
+ private void PutInt(int value) => _buffer.WriteInt(value);
+
+ private void PutLong(long value) => _buffer.WriteLong(value);
+
+ private void PutFloat(float value) => PutInt(BitConverter.SingleToInt32Bits(value));
+
+ private void PutDouble(double value) => PutLong(BitConverter.DoubleToInt64Bits(value));
+
+ private void PutBytes(Span<byte> bytes) => bytes.CopyTo(GetSpan(bytes.Length));
+
+ private void PutString(string value)
+ {
+ if (value.Length == 0)
+ {
+ return;
+ }
+
+ var maxByteCount = Encoding.UTF8.GetMaxByteCount(value.Length);
+ var span = _buffer.GetSpan(maxByteCount);
+
+ var actualBytes = Encoding.UTF8.GetBytes(value, span);
+
+ _buffer.Advance(actualBytes);
+ }
+
+ /// <summary>
+ /// Proceed to the next tuple element.
+ /// </summary>
+ private void OnWrite()
+ {
+ Debug.Assert(_elementIndex < _numElements, "_elementIndex < _numElements");
+
+ int offset = _buffer.Position - _valueBase;
+
+ switch (_entrySize)
+ {
+ case 1:
+ _buffer.WriteByte((byte)offset, _entryBase + _elementIndex);
+ break;
+
+ case 2:
+ _buffer.WriteShort((short)offset, _entryBase + _elementIndex * 2);
+ break;
+
+ case 4:
+ _buffer.WriteInt(offset, _entryBase + _elementIndex * 4);
+ break;
+
+ default:
+ throw new InvalidOperationException("Tuple entry size is invalid.");
+ }
+
+ _elementIndex++;
+ }
+
+ private Span<byte> GetSpan(int size)
+ {
+ var span = _buffer.GetSpan(size);
+
+ _buffer.Advance(size);
+
+ return span;
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
new file mode 100644
index 0000000000..6bfb494e13
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleCommon.cs
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto.BinaryTuple
+{
+ /// <summary>
+ /// Common binary tuple constants and utils.
+ /// </summary>
+ internal static class BinaryTupleCommon
+ {
+ /// <summary>
+ /// Size of a tuple header, in bytes.
+ /// </summary>
+ public const int HeaderSize = 1;
+
+ /// <summary>
+ /// Mask for size of entries in variable-length offset table.
+ /// </summary>
+ public const int VarsizeMask = 0b011;
+
+ /// <summary>
+ /// Flag that indicates null map presence.
+ /// </summary>
+ public const int NullmapFlag = 0b100;
+
+ /// <summary>
+ /// Calculates flags for a given size of variable-length area.
+ /// </summary>
+ /// <param name="size">Variable-length area size.</param>
+ /// <returns>Flags value.</returns>
+ public static byte ValueSizeToFlags(long size)
+ {
+ if (size <= 0xff)
+ {
+ return 0b00;
+ }
+
+ if (size <= 0xffff)
+ {
+ return 0b01;
+ }
+
+ if (size <= int.MaxValue)
+ {
+ return 0b10;
+ }
+
+ throw new IgniteClientException("Too big binary tuple size");
+ }
+
+ /// <summary>
+ /// Calculates the size of entry in variable-length offset table for given flags.
+ /// </summary>
+ /// <param name="flags">Flags.</param>
+ /// <returns>Size of entry in variable-length offset table.</returns>
+ public static int FlagsToEntrySize(byte flags)
+ {
+ return 1 << (flags & VarsizeMask);
+ }
+
+ /// <summary>
+ /// Calculates the null map size.
+ /// </summary>
+ /// <param name="numElements">Number of tuple elements.</param>
+ /// <returns>Null map size in bytes.</returns>
+ public static int NullMapSize(int numElements)
+ {
+ return (numElements + 7) / 8;
+ }
+
+ /// <summary>
+ /// Returns offset of the byte that contains null-bit of a given tuple element.
+ /// </summary>
+ /// <param name="index">Tuple element index.</param>
+ /// <returns>Offset of the required byte relative to the tuple start.</returns>
+ public static int NullOffset(int index)
+ {
+ return HeaderSize + index / 8;
+ }
+
+ /// <summary>
+ /// Returns a null-bit mask corresponding to a given tuple element.
+ /// </summary>
+ /// <param name="index">Tuple element index.</param>
+ /// <returns>Mask to extract the required null-bit.</returns>
+ public static byte NullMask(int index)
+ {
+ return (byte)(1 << (index % 8));
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
new file mode 100644
index 0000000000..36cabae670
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleReader.cs
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto.BinaryTuple
+{
+ using System;
+ using System.Buffers.Binary;
+ using System.Diagnostics;
+ using System.Text;
+
+ /// <summary>
+ /// Binary tuple reader.
+ /// </summary>
+ internal sealed class BinaryTupleReader // TODO: Support all types (IGNITE-15431).
+ {
+ /** Buffer. */
+ private readonly ReadOnlyMemory<byte> _buffer;
+
+ /** Number of elements in the tuple. */
+ private readonly int _numElements;
+
+ /** Size of an offset table entry. */
+ private readonly int _entrySize;
+
+ /** Position of the varlen offset table. */
+ private readonly int _entryBase;
+
+ /** Starting position of variable-length values. */
+ private readonly int _valueBase;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BinaryTupleReader"/> class.
+ /// </summary>
+ /// <param name="buffer">Buffer.</param>
+ /// <param name="numElements">Number of elements in the tuple.</param>
+ public BinaryTupleReader(ReadOnlyMemory<byte> buffer, int numElements)
+ {
+ _buffer = buffer;
+ _numElements = numElements;
+
+ var flags = buffer.Span[0];
+
+ int @base = BinaryTupleCommon.HeaderSize;
+
+ if ((flags & BinaryTupleCommon.NullmapFlag) != 0)
+ {
+ @base += BinaryTupleCommon.NullMapSize(numElements);
+ }
+
+ _entryBase = @base;
+ _entrySize = 1 << (flags & BinaryTupleCommon.VarsizeMask);
+ _valueBase = @base + _entrySize * numElements;
+ }
+
+ /// <summary>
+ /// Gets a value indicating whether this reader has a null map.
+ /// </summary>
+ public bool HasNullMap => _entryBase > BinaryTupleCommon.HeaderSize;
+
+ /// <summary>
+ /// Gets a value indicating whether the element at specified index is null.
+ /// </summary>
+ /// <param name="index">Element index.</param>
+ /// <returns>True when the element is null; false otherwise.</returns>
+ public bool IsNull(int index)
+ {
+ if (!HasNullMap)
+ {
+ return false;
+ }
+
+ int nullIndex = BinaryTupleCommon.NullOffset(index);
+ byte nullMask = BinaryTupleCommon.NullMask(index);
+
+ return (_buffer.Span[nullIndex] & nullMask) != 0;
+ }
+
+ /// <summary>
+ /// Gets a byte value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public sbyte GetByte(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ var s => unchecked((sbyte)s[0])
+ };
+
+ /// <summary>
+ /// Gets a short value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public short GetShort(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ { Length: 1 } s => unchecked((sbyte)s[0]),
+ var s => BinaryPrimitives.ReadInt16LittleEndian(s)
+ };
+
+ /// <summary>
+ /// Gets an int value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public int GetInt(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ { Length: 1 } s => unchecked((sbyte)s[0]),
+ { Length: 2 } s => BinaryPrimitives.ReadInt16LittleEndian(s),
+ var s => BinaryPrimitives.ReadInt32LittleEndian(s)
+ };
+
+ /// <summary>
+ /// Gets a long value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public long GetLong(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ { Length: 1 } s => unchecked((sbyte)s[0]),
+ { Length: 2 } s => BinaryPrimitives.ReadInt16LittleEndian(s),
+ { Length: 4 } s => BinaryPrimitives.ReadInt32LittleEndian(s),
+ var s => BinaryPrimitives.ReadInt64LittleEndian(s)
+ };
+
+ /// <summary>
+ /// Gets a Guid value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public Guid GetGuid(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ var s => UuidSerializer.Read(s)
+ };
+
+ /// <summary>
+ /// Gets a string value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public string GetString(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => string.Empty,
+ var s => Encoding.UTF8.GetString(s)
+ };
+
+ /// <summary>
+ /// Gets a float value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public float GetFloat(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ var s => BitConverter.Int32BitsToSingle(BinaryPrimitives.ReadInt32LittleEndian(s))
+ };
+
+ /// <summary>
+ /// Gets a double value.
+ /// </summary>
+ /// <param name="index">Index.</param>
+ /// <returns>Value.</returns>
+ public double GetDouble(int index) => Seek(index) switch
+ {
+ { IsEmpty: true } => default,
+ { Length: 4 } s => BitConverter.Int32BitsToSingle(BinaryPrimitives.ReadInt32LittleEndian(s)),
+ var s => BitConverter.Int64BitsToDouble(BinaryPrimitives.ReadInt64LittleEndian(s))
+ };
+
+ private int GetOffset(int position)
+ {
+ var span = _buffer.Span[position..];
+
+ switch (_entrySize)
+ {
+ case 1:
+ return span[0];
+
+ case 2:
+ return BinaryPrimitives.ReadUInt16LittleEndian(span);
+
+ case 4:
+ {
+ var offset = BinaryPrimitives.ReadInt32LittleEndian(span);
+
+ if (offset < 0)
+ {
+ throw new InvalidOperationException("Unsupported offset table size");
+ }
+
+ return offset;
+ }
+
+ default:
+ throw new InvalidOperationException("Invalid offset table size");
+ }
+ }
+
+ private ReadOnlySpan<byte> Seek(int index)
+ {
+ Debug.Assert(index >= 0, "index >= 0");
+ Debug.Assert(index < _numElements, "index < numElements");
+
+ int entry = _entryBase + index * _entrySize;
+
+ int offset = _valueBase;
+ if (index > 0)
+ {
+ offset += GetOffset(entry - _entrySize);
+ }
+
+ int nextOffset = _valueBase + GetOffset(entry);
+
+ if (nextOffset < offset)
+ {
+ throw new InvalidOperationException("Corrupted offset table");
+ }
+
+ if (offset == nextOffset && IsNull(index))
+ {
+ throw new InvalidOperationException($"Binary tuple element with index {index} is null.");
+ }
+
+ return _buffer.Span.Slice(offset, nextOffset - offset);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackReaderExtensions.cs
index 146c3d8dbb..d694abe61b 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackReaderExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackReaderExtensions.cs
@@ -19,7 +19,6 @@ namespace Apache.Ignite.Internal.Proto
{
using System;
using System.Buffers;
- using System.Buffers.Binary;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using MessagePack;
@@ -126,20 +125,7 @@ namespace Apache.Ignite.Internal.Proto
Debug.Assert(jBytes.Length == guidSize, "jBytes.Length == 16");
- // Hoist bounds checks.
- var k = jBytes[15];
- var a = BinaryPrimitives.ReadInt32BigEndian(jBytes);
- var b = BinaryPrimitives.ReadInt16BigEndian(jBytes[4..]);
- var c = BinaryPrimitives.ReadInt16BigEndian(jBytes[6..]);
- var d = jBytes[8];
- var e = jBytes[9];
- var f = jBytes[10];
- var g = jBytes[11];
- var h = jBytes[12];
- var i = jBytes[13];
- var j = jBytes[14];
-
- return new Guid(a, b, c, d, e, f, g, h, i, j, k);
+ return UuidSerializer.Read(jBytes);
}
/// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackWriterExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackWriterExtensions.cs
index 5a5bde15e8..9428590ec7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackWriterExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MessagePackWriterExtensions.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Internal.Proto
{
using System;
- using System.Diagnostics;
using MessagePack;
using Transactions;
@@ -39,33 +38,9 @@ namespace Apache.Ignite.Internal.Proto
{
writer.WriteExtensionFormatHeader(new ExtensionHeader((sbyte)ClientMessagePackType.Uuid, 16));
- Span<byte> jBytes = writer.GetSpan(16);
+ var jBytes = writer.GetSpan(16);
- var written = guid.TryWriteBytes(jBytes);
- Debug.Assert(written, "written");
-
- if (BitConverter.IsLittleEndian)
- {
- var c1 = jBytes[7];
- var c2 = jBytes[6];
-
- var b1 = jBytes[5];
- var b2 = jBytes[4];
-
- var a1 = jBytes[3];
- var a2 = jBytes[2];
- var a3 = jBytes[1];
- var a4 = jBytes[0];
-
- jBytes[0] = a1;
- jBytes[1] = a2;
- jBytes[2] = a3;
- jBytes[3] = a4;
- jBytes[4] = b1;
- jBytes[5] = b2;
- jBytes[6] = c1;
- jBytes[7] = c2;
- }
+ UuidSerializer.Write(guid, jBytes);
writer.Advance(16);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
index f290e53fe1..3a077f4c50 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ProtoCommon.cs
@@ -17,14 +17,27 @@
namespace Apache.Ignite.Internal.Proto
{
+ using Buffers;
+
/// <summary>
/// Common protocol data.
/// </summary>
internal static class ProtoCommon
{
+ /// <summary>
+ /// Message prefix size.
+ /// </summary>
+ public const int MessagePrefixSize = 4 + 5 + 9; // Size (4 bytes) + OpCode (5 bytes) + RequestId (9 bytes)/
+
/// <summary>
/// Magic bytes.
/// </summary>
public static readonly byte[] MagicBytes = { (byte)'I', (byte)'G', (byte)'N', (byte)'I' };
+
+ /// <summary>
+ /// Gets a new message writer.
+ /// </summary>
+ /// <returns>Message writer.</returns>
+ public static PooledArrayBufferWriter GetMessageWriter() => new(prefixSize: MessagePrefixSize);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/UuidSerializer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/UuidSerializer.cs
new file mode 100644
index 0000000000..e98046d7e8
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/UuidSerializer.cs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Proto
+{
+ using System;
+ using System.Buffers.Binary;
+ using System.Diagnostics;
+ using System.Runtime.InteropServices;
+
+ /// <summary>
+ /// Serializes and deserializes Guids in Java-specific UUID format.
+ /// </summary>
+ internal static class UuidSerializer
+ {
+ /// <summary>
+ /// Writes Guid as Java UUID.
+ /// </summary>
+ /// <param name="guid">Guid.</param>
+ /// <param name="span">Target span.</param>
+ public static void Write(Guid guid, Span<byte> span)
+ {
+ var written = guid.TryWriteBytes(span);
+ Debug.Assert(written, "written");
+
+ // Reverse endianness of the first part.
+ var a = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<int>(span));
+ MemoryMarshal.Write(span, ref a);
+
+ var b = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<short>(span[4..]));
+ MemoryMarshal.Write(span[4..], ref b);
+
+ var c = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<short>(span[6..]));
+ MemoryMarshal.Write(span[6..], ref c);
+ }
+
+ /// <summary>
+ /// Reads Java UUID as Guid.
+ /// </summary>
+ /// <param name="span">Span.</param>
+ /// <returns>Guid.</returns>
+ public static Guid Read(ReadOnlySpan<byte> span)
+ {
+ // Hoist bounds checks.
+ var k = span[15];
+ var a = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<int>(span));
+ var b = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<short>(span[4..]));
+ var c = BinaryPrimitives.ReverseEndianness(MemoryMarshal.Read<short>(span[6..]));
+ var d = span[8];
+ var e = span[9];
+ var f = span[10];
+ var g = span[11];
+ var h = span[12];
+ var i = span[13];
+ var j = span[14];
+
+ return new Guid(a, b, c, d, e, f, g, h, i, j, k);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
index b13cbf61f5..dddc0cead4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
@@ -165,7 +165,7 @@ namespace Apache.Ignite.Internal.Sql
if (_resourceId != null && !_resourceClosed)
{
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
WriteId(writer.GetMessageWriter());
await _socket.DoOutInOpAsync(ClientOp.SqlCursorClose, writer).ConfigureAwait(false);
@@ -339,7 +339,7 @@ namespace Apache.Ignite.Internal.Sql
private async Task<PooledBuffer> FetchNextPage()
{
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
WriteId(writer.GetMessageWriter());
return await _socket.DoOutInOpAsync(ClientOp.SqlCursorNextPage, writer).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index bfe0124c48..48d2f9fdba 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -59,7 +59,7 @@ namespace Apache.Ignite.Internal.Sql
PooledArrayBufferWriter Write()
{
- var writer = new PooledArrayBufferWriter();
+ var writer = ProtoCommon.GetMessageWriter();
var w = writer.GetMessageWriter();
w.WriteTx(tx);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 184ea23522..0e6b9c2dff 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -83,7 +83,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteMultiple(writer, tx, schema, iterator, keyOnly: true);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleGetAll, tx, writer).ConfigureAwait(false);
@@ -116,7 +116,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteMultiple(writer, tx, schema, iterator);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleUpsertAll, tx, writer).ConfigureAwait(false);
@@ -157,7 +157,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteMultiple(writer, tx, schema, iterator);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleInsertAll, tx, writer).ConfigureAwait(false);
@@ -184,7 +184,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteTwo(writer, tx, schema, record, newRecord);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleReplaceExact, tx, writer).ConfigureAwait(false);
@@ -246,7 +246,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteMultiple(writer, tx, schema, iterator, keyOnly: true);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleDeleteAll, tx, writer).ConfigureAwait(false);
@@ -271,7 +271,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.WriteMultiple(writer, tx, schema, iterator);
using var resBuf = await DoOutInOpAsync(ClientOp.TupleDeleteAllExact, tx, writer).ConfigureAwait(false);
@@ -297,7 +297,7 @@ namespace Apache.Ignite.Internal.Table
var schema = await _table.GetLatestSchemaAsync().ConfigureAwait(false);
var tx = transaction.ToInternal();
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
_ser.Write(writer, tx, schema, record, keyOnly);
return await DoOutInOpAsync(op, tx, writer).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 88182e15b2..7913304cbe 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -18,6 +18,7 @@
namespace Apache.Ignite.Internal.Table.Serialization
{
using System;
+ using System.Buffers.Binary;
using System.Collections.Generic;
using Buffers;
using MessagePack;
@@ -206,7 +207,8 @@ namespace Apache.Ignite.Internal.Table.Serialization
w.Flush();
var count = 0;
- var countPos = buf.ReserveInt32();
+ var countSpan = buf.GetSpan(5);
+ buf.Advance(5);
do
{
@@ -222,7 +224,8 @@ namespace Apache.Ignite.Internal.Table.Serialization
}
while (recs.MoveNext()); // First MoveNext is called outside to check for empty IEnumerable.
- buf.WriteInt32(countPos, count);
+ countSpan[0] = MessagePackCode.Int32;
+ BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
w.Flush();
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index ff83923508..d40206d0a1 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -151,7 +151,7 @@ namespace Apache.Ignite.Internal.Table
/// <returns>Schema.</returns>
private async Task<Schema> LoadSchemaAsync(int? version)
{
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
Write();
using var resBuf = await _socket.DoOutInOpAsync(ClientOp.SchemasGet, writer).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
index ef8ddd8f03..12e9582494 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Tables.cs
@@ -21,7 +21,6 @@ namespace Apache.Ignite.Internal.Table
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
- using Buffers;
using Common;
using Ignite.Table;
using MessagePack;
@@ -92,7 +91,7 @@ namespace Apache.Ignite.Internal.Table
{
IgniteArgumentCheck.NotNull(name, nameof(name));
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
Write(writer.GetMessageWriter());
using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TableGet, writer).ConfigureAwait(false);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
index f06d1504d2..e284304176 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
@@ -20,7 +20,6 @@ namespace Apache.Ignite.Internal.Transactions
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
- using Buffers;
using Ignite.Transactions;
using MessagePack;
using Proto;
@@ -75,7 +74,7 @@ namespace Apache.Ignite.Internal.Transactions
{
SetState(StateCommitted);
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
Write(writer.GetMessageWriter());
await Socket.DoOutInOpAsync(ClientOp.TxCommit, writer).ConfigureAwait(false);
@@ -104,7 +103,7 @@ namespace Apache.Ignite.Internal.Transactions
/// </summary>
private async Task RollbackAsyncInternal()
{
- using var writer = new PooledArrayBufferWriter();
+ using var writer = ProtoCommon.GetMessageWriter();
Write(writer.GetMessageWriter());
await Socket.DoOutInOpAsync(ClientOp.TxRollback, writer).ConfigureAwait(false);