You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/07/06 07:56:07 UTC
[pulsar-dotpulsar] branch master updated: Allow for concurrent
message production (#23)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1c4acb Allow for concurrent message production (#23)
a1c4acb is described below
commit a1c4acbd9610998676ecdfbcd911bfc80340ebc1
Author: Chickenzilla <Ch...@users.noreply.github.com>
AuthorDate: Mon Jul 6 00:55:40 2020 -0700
Allow for concurrent message production (#23)
* Fix use of modified SequenceId
* Clean up SequenceId initial checking
---
src/DotPulsar/Internal/ProducerChannel.cs | 11 +++++------
src/DotPulsar/Internal/RequestResponseHandler.cs | 24 +++++++++++------------
src/DotPulsar/Internal/SequenceId.cs | 25 +++++++++++++++++-------
3 files changed, 35 insertions(+), 25 deletions(-)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 824ed87..0ba8cd1 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -90,8 +90,9 @@ namespace DotPulsar.Internal
if (autoAssignSequenceId)
{
- sendPackage.Command.SequenceId = _sequenceId.Current;
- sendPackage.Metadata.SequenceId = _sequenceId.Current;
+ var newSequenceId = _sequenceId.FetchNext();
+ sendPackage.Command.SequenceId = newSequenceId;
+ sendPackage.Metadata.SequenceId = newSequenceId;
}
else
sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
@@ -99,9 +100,6 @@ namespace DotPulsar.Internal
var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
- if (autoAssignSequenceId)
- _sequenceId.Increment();
-
return response.SendReceipt;
}
finally
@@ -115,3 +113,4 @@ namespace DotPulsar.Internal
}
}
}
+
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c023c56..c855337 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@ namespace DotPulsar.Internal
private const string ConnectResponseIdentifier = "Connected";
private readonly Awaiter<string, BaseCommand> _responses;
- private ulong _requestId;
+ private SequenceId _requestId;
public RequestResponseHandler()
{
_responses = new Awaiter<string, BaseCommand>();
- _requestId = 1;
+ _requestId = new SequenceId(1);
}
public void Dispose()
@@ -53,31 +53,31 @@ namespace DotPulsar.Internal
switch (cmd.CommandType)
{
case BaseCommand.Type.Seek:
- cmd.Seek.RequestId = _requestId++;
+ cmd.Seek.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.Lookup:
- cmd.LookupTopic.RequestId = _requestId++;
+ cmd.LookupTopic.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.Error:
- cmd.Error.RequestId = _requestId++;
+ cmd.Error.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.Producer:
- cmd.Producer.RequestId = _requestId++;
+ cmd.Producer.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.CloseProducer:
- cmd.CloseProducer.RequestId = _requestId++;
+ cmd.CloseProducer.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.Subscribe:
- cmd.Subscribe.RequestId = _requestId++;
+ cmd.Subscribe.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.Unsubscribe:
- cmd.Unsubscribe.RequestId = _requestId++;
+ cmd.Unsubscribe.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.CloseConsumer:
- cmd.CloseConsumer.RequestId = _requestId++;
+ cmd.CloseConsumer.RequestId = _requestId.FetchNext();
return;
case BaseCommand.Type.GetLastMessageId:
- cmd.GetLastMessageId.RequestId = _requestId++;
+ cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
return;
}
}
@@ -90,7 +90,7 @@ namespace DotPulsar.Internal
BaseCommand.Type.Send => $"{cmd.Send.ProducerId}-{cmd.Send.SequenceId}",
BaseCommand.Type.SendError => $"{cmd.SendError.ProducerId}-{cmd.SendError.SequenceId}",
BaseCommand.Type.SendReceipt => $"{cmd.SendReceipt.ProducerId}-{cmd.SendReceipt.SequenceId}",
- BaseCommand.Type.Error => _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
+ BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
BaseCommand.Type.Producer => cmd.Producer.RequestId.ToString(),
BaseCommand.Type.ProducerSuccess => cmd.ProducerSuccess.RequestId.ToString(),
BaseCommand.Type.CloseProducer => cmd.CloseProducer.RequestId.ToString(),
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 5b41d7d..55ba94a 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,21 +12,32 @@
* limitations under the License.
*/
+using System.Threading;
+
namespace DotPulsar.Internal
{
public sealed class SequenceId
{
+ private long _current;
+ private ulong _initial;
+
public SequenceId(ulong initialSequenceId)
{
- Current = initialSequenceId;
-
- if (initialSequenceId > 0)
- Increment();
+ // Subtracting one because Interlocked.Increment will return the post-incremented value
+ // which is expected to be the initialSequenceId for the first call
+ _current = unchecked((long)initialSequenceId - 1);
+ _initial = initialSequenceId - 1;
}
- public ulong Current { get; private set; }
+ // Returns false if FetchNext has not been called on this object before (or if it somehow wrapped around 2^64)
+ public bool IsPastInitialId()
+ {
+ return unchecked((ulong)_current != _initial);
+ }
- public void Increment()
- => ++Current;
+ public ulong FetchNext()
+ {
+ return unchecked((ulong)Interlocked.Increment(ref _current));
+ }
}
}