You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/04/30 05:34:07 UTC
[pulsar-dotpulsar] branch master updated: Fixing two bugs. Make
ready for 1.0.2
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 346e606 Fixing two bugs. Make ready for 1.0.2
346e606 is described below
commit 346e606a52c9ea398199ab49ce78a05104114afa
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Fri Apr 30 07:33:13 2021 +0200
Fixing two bugs.
Make ready for 1.0.2
---
CHANGELOG.md | 7 ++++
src/DotPulsar/DotPulsar.csproj | 2 +-
src/DotPulsar/Internal/Channel.cs | 11 ++++-
.../Extensions/ReadOnlySequenceExtensions.cs | 2 +-
src/DotPulsar/Internal/Producer.cs | 1 +
.../Extensions/ReadOnlySequenceExtensionsTests.cs | 47 ++++++++++++++++------
6 files changed, 55 insertions(+), 15 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5e998e..eada720 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [1.0.2] - 2021-04-30
+
+### Fixed
+
+- Closing a consumer or reader while the broker is streaming messages could take down the connection causing other consumers, readers, and producers of the connection to reconnect
+- In some circumstances, the protocol bytes could be misread leading to wrong messages parsing
+
## [1.0.1] - 2021-03-30
### Fixed
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 8f6cd7e..fc72713 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>1.0.1</Version>
+ <Version>1.0.2</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index b441680..eb67755 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -35,7 +35,16 @@ namespace DotPulsar.Internal
}
public void Received(MessagePackage message)
- => _enqueue.Enqueue(message);
+ {
+ try
+ {
+ _enqueue.Enqueue(message);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
public void Activated()
=> _eventRegister.Register(new ChannelActivated(_correlationId));
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index ea5fa12..21994fb 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -88,7 +88,7 @@ namespace DotPulsar.Internal.Extensions
}
}
- if (read == 3)
+ if (read == 4)
break;
start = 0;
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 5264b25..b130318 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -91,6 +91,7 @@ namespace DotPulsar.Internal
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
+
public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
=> await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index b84268c..c4d9df6 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -127,34 +127,57 @@ namespace DotPulsar.Tests.Internal.Extensions
actual.Should().Be(expected);
}
- [Fact]
- public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult()
+#pragma warning disable xUnit1025 // Miscoded warning can't tell these are all different
+ [Theory]
+ [InlineData(new byte[] { }, new byte[] { 0x02, 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04, 0x05 }, new byte[] { })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { }, new byte[] { 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 }, new byte[] { 0x09 })]
+#pragma warning restore xUnit1025 // InlineData should be unique within the Theory it belongs to
+ public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult(params byte[][] testPath)
{
//Arrange
- var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+ var sequenceBuilder = new SequenceBuilder<byte>();
+ foreach (var array in testPath)
+ sequenceBuilder.Append(array);
+ var sequence = sequenceBuilder.Build();
//Act
var actual = sequence.ReadUInt32(0, true);
//Assert
- const uint expected = 66051;
+ const uint expected = 0x02030405;
actual.Should().Be(expected);
}
- [Fact]
- public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult()
+ [Theory]
+ [InlineData(2, new byte[] { 0x09, 0x09, 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09, 0x09, 0x09 })]
+ [InlineData(3, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09, 0x09 })]
+ [InlineData(4, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x09, 0x02, 0x03 }, new byte[] { 0x04, 0x05, 0x09 })]
+ public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult(long start, params byte[][] testPath)
{
//Arrange
- var sequence = new SequenceBuilder<byte>()
- .Append(new byte[] { 0x09, 0x09, 0x09 })
- .Append(new byte[] { 0x09, 0x00, 0x01 })
- .Append(new byte[] { 0x02, 0x03 }).Build();
+ var sequenceBuilder = new SequenceBuilder<byte>();
+ foreach (var array in testPath)
+ sequenceBuilder.Append(array);
+ var sequence = sequenceBuilder.Build();
//Act
- var actual = sequence.ReadUInt32(4, true);
+ var actual = sequence.ReadUInt32(start, true);
//Assert
- const uint expected = 66051;
+ const uint expected = 0x02030405;
actual.Should().Be(expected);
}
}