You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/12/04 08:05:11 UTC

[pulsar-dotpulsar] branch master updated: NetworkStream doesn't buffer data, so to minimize context switches when sending on the socket, we create larger chunks from small 'Memory' segments in the sequence.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e5c3d26  NetworkStream doesn't buffer data, so to minimize context switches when sending on the socket, we create larger chunks from small 'Memory' segments in the sequence.
e5c3d26 is described below

commit e5c3d26d60826af5db32b0d95a1078e7afa55e26
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Fri Dec 4 09:04:55 2020 +0100

    NetworkStream doesn't buffer data, so to minimize context switches when sending on the socket, we create larger chunks from small 'Memory' segments in the sequence.
---
 src/DotPulsar/Internal/ChunkingPipeline.cs         | 106 +++++++++++++++++++
 src/DotPulsar/Internal/PulsarStream.cs             |  19 +---
 src/DotPulsar/Internal/SequenceBuilder.cs          |   2 +-
 .../DotPulsar.StressTests.csproj                   |   2 +-
 tests/DotPulsar.Tests/DotPulsar.Tests.csproj       |   2 +-
 .../Internal/ChunkingPipelineTests.cs              | 117 +++++++++++++++++++++
 6 files changed, 231 insertions(+), 17 deletions(-)

diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs b/src/DotPulsar/Internal/ChunkingPipeline.cs
new file mode 100644
index 0000000..74ecf31
--- /dev/null
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using System;
+    using System.Buffers;
+    using System.IO;
+    using System.Threading.Tasks;
+
+    public sealed class ChunkingPipeline
+    {
+        private readonly Stream _stream;
+        private readonly int _chunkSize;
+        private readonly byte[] _buffer;
+        private int _bufferCount;
+
+        public ChunkingPipeline(Stream stream, int chunkSize)
+        {
+            _stream = stream;
+            _chunkSize = chunkSize;
+            _buffer = new byte[_chunkSize];
+        }
+
+        private void CopyToBuffer(ReadOnlySequence<byte> sequence) => sequence.CopyTo(_buffer.AsSpan());
+
+        private void CopyToBuffer(ReadOnlyMemory<byte> memory) => memory.CopyTo(_buffer.AsMemory(_bufferCount));
+
+        public async ValueTask Send(ReadOnlySequence<byte> sequence)
+        {
+            var sequenceLength = sequence.Length;
+
+            if (sequenceLength <= _chunkSize)
+            {
+                CopyToBuffer(sequence);
+                _bufferCount = (int) sequenceLength;
+                await SendBuffer().ConfigureAwait(false);
+                return;
+            }
+
+            var enumerator = sequence.GetEnumerator();
+            var hasNext = true;
+
+            while (hasNext)
+            {
+                var current = enumerator.Current;
+                var currentLength = current.Length;
+                hasNext = enumerator.MoveNext();
+
+                if (currentLength > _chunkSize)
+                {
+                    await Send(current).ConfigureAwait(false);
+                    continue;
+                }
+
+                var total = currentLength + _bufferCount;
+
+                if (total > _chunkSize)
+                    await SendBuffer().ConfigureAwait(false);
+
+                if (_bufferCount != 0 || (hasNext && enumerator.Current.Length + total <= _chunkSize))
+                {
+                    CopyToBuffer(current);
+                    _bufferCount = total;
+                    continue;
+                }
+
+                await Send(current).ConfigureAwait(false);
+            }
+
+            await SendBuffer().ConfigureAwait(false);
+        }
+
+        private async ValueTask SendBuffer()
+        {
+            if (_bufferCount != 0)
+            {
+                await _stream.WriteAsync(_buffer, 0, _bufferCount).ConfigureAwait(false);
+                _bufferCount = 0;
+            }
+        }
+
+        private async ValueTask Send(ReadOnlyMemory<byte> memory)
+        {
+            await SendBuffer().ConfigureAwait(false);
+
+#if NETSTANDARD2_0
+            var data = memory.ToArray();
+            await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
+#else
+            await _stream.WriteAsync(memory).ConfigureAwait(false);
+#endif
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 01a6062..867f50f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -30,8 +30,10 @@ namespace DotPulsar.Internal
     {
         private const long _pauseAtMoreThan10Mb = 10485760;
         private const long _resumeAt5MbOrLess = 5242881;
+        private const int _chunkSize = 75000;
 
         private readonly Stream _stream;
+        private readonly ChunkingPipeline _pipeline;
         private readonly PipeReader _reader;
         private readonly PipeWriter _writer;
         private int _isDisposed;
@@ -39,6 +41,7 @@ namespace DotPulsar.Internal
         public PulsarStream(Stream stream)
         {
             _stream = stream;
+            _pipeline = new ChunkingPipeline(stream, _chunkSize);
             var options = new PipeOptions(pauseWriterThreshold: _pauseAtMoreThan10Mb, resumeWriterThreshold: _resumeAt5MbOrLess);
             var pipe = new Pipe(options);
             _reader = pipe.Reader;
@@ -48,19 +51,7 @@ namespace DotPulsar.Internal
         public async Task Send(ReadOnlySequence<byte> sequence)
         {
             ThrowIfDisposed();
-
-#if NETSTANDARD2_0
-            foreach (var segment in sequence)
-            {
-                var data = segment.ToArray();
-                await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
-            }
-#else
-            foreach (var segment in sequence)
-            {
-                await _stream.WriteAsync(segment).ConfigureAwait(false);
-            }
-#endif
+            await _pipeline.Send(sequence).ConfigureAwait(false);
         }
 
 #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
@@ -88,7 +79,7 @@ namespace DotPulsar.Internal
 #endif
                 while (true)
                 {
-                    var memory = _writer.GetMemory(84999); // LOH - 1 byte
+                    var memory = _writer.GetMemory(84999);
 #if NETSTANDARD2_0
                     var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
                     new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index b2433ea..d7337b2 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -66,7 +66,7 @@ namespace DotPulsar.Internal
         {
             var node = _elements.First;
             if (node is null)
-                return new ReadOnlySequence<T>();
+                return ReadOnlySequence<T>.Empty;
 
             var current = new Segment(node.Value);
             var start = current;
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 5650dd6..bdf1d1a 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -6,7 +6,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index a3e617c..74cb6b9 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,7 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
new file mode 100644
index 0000000..de3895b
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using FluentAssertions;
+    using System;
+    using System.Buffers;
+    using System.IO;
+    using System.Linq;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class ChunkingPipelineTests
+    {
+        [Fact]
+        public async Task Send_GivenSequenceIsUnderChunkSize_ShouldWriteArrayOnce()
+        {
+            //Arrange
+            var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+            var b = new byte[] { 0x04, 0x05, 0x06, 0x07 };
+            var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Build();
+            var mockStream = new MockStream();
+            var sut = new ChunkingPipeline(mockStream, 9);
+
+            //Act
+            await sut.Send(sequence);
+
+            //Assert
+            var expected = sequence.ToArray();
+            var actual = mockStream.GetReadOnlySequence();
+            actual.ToArray().Should().Equal(expected);
+            actual.IsSingleSegment.Should().BeTrue();
+        }
+
+        [Theory]
+        [InlineData(4, 6, 3, 4, 6, 3)]     // No segments can be merged
+        [InlineData(1, 6, 4, 7, 4, null)]  // Can merge a and b
+        [InlineData(4, 6, 1, 4, 7, null)]  // Can merge b and c
+        public async Task Send_GivenSequenceIsOverChunkSize_ShouldWriteMultipleArrays(int length1, int length2, int length3, int expected1, int expected2, int? expected3)
+        {
+            //Arrange
+            var a = Enumerable.Range(0, length1).Select(i => (byte) i).ToArray();
+            var b = Enumerable.Range(length1, length2).Select(i => (byte) i).ToArray();
+            var c = Enumerable.Range(length1 + length2, length3).Select(i => (byte) i).ToArray();
+            var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Append(c).Build();
+            var mockStream = new MockStream();
+            var sut = new ChunkingPipeline(mockStream, 8);
+
+            //Act
+            await sut.Send(sequence);
+
+            //Assert
+            var expected = sequence.ToArray();
+            var actual = mockStream.GetReadOnlySequence();
+            actual.ToArray().Should().Equal(expected);
+            GetNumberOfSegments(actual).Should().Be(expected3.HasValue ? 3 : 2);
+
+            var segmentNumber = 0;
+            foreach (var segment in actual)
+            {
+                switch (segmentNumber)
+                {
+                    case 0:
+                        segment.Length.Should().Be(expected1);
+                        break;
+                    case 1:
+                        segment.Length.Should().Be(expected2);
+                        break;
+                    case 2:
+                        expected3.Should().NotBeNull();
+                        segment.Length.Should().Be(expected3);
+                        break;
+                }
+                ++segmentNumber;
+            }
+        }
+
+        private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
+        {
+            var numberOfSegments = 0;
+            foreach (var segment in sequence)
+                ++numberOfSegments;
+            return numberOfSegments;
+        }
+
+        private class MockStream : Stream
+        {
+            private readonly SequenceBuilder<byte> _builder;
+
+            public MockStream() => _builder = new SequenceBuilder<byte>();
+            public override bool CanRead => throw new NotImplementedException();
+            public override bool CanSeek => throw new NotImplementedException();
+            public override bool CanWrite => true;
+            public override long Length => throw new NotImplementedException();
+            public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
+            public override void Flush() => throw new NotImplementedException();
+            public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
+            public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
+            public override void SetLength(long value) => throw new NotImplementedException();
+            public override void Write(byte[] buffer, int offset, int count) => _builder.Append(new ReadOnlyMemory<byte>(buffer, offset, count));
+            public ReadOnlySequence<byte> GetReadOnlySequence() => _builder.Build();
+        }
+    }
+}