You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/07/06 07:56:07 UTC

[pulsar-dotpulsar] branch master updated: Allow for concurrent message production (#23)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c4acb  Allow for concurrent message production (#23)
a1c4acb is described below

commit a1c4acbd9610998676ecdfbcd911bfc80340ebc1
Author: Chickenzilla <Ch...@users.noreply.github.com>
AuthorDate: Mon Jul 6 00:55:40 2020 -0700

    Allow for concurrent message production (#23)
    
    * Fix use of modified SequenceId
    
    * Clean up SequenceId initial checking
---
 src/DotPulsar/Internal/ProducerChannel.cs        | 11 +++++------
 src/DotPulsar/Internal/RequestResponseHandler.cs | 24 +++++++++++------------
 src/DotPulsar/Internal/SequenceId.cs             | 25 +++++++++++++++++-------
 3 files changed, 35 insertions(+), 25 deletions(-)

diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 824ed87..0ba8cd1 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -90,8 +90,9 @@ namespace DotPulsar.Internal
 
                 if (autoAssignSequenceId)
                 {
-                    sendPackage.Command.SequenceId = _sequenceId.Current;
-                    sendPackage.Metadata.SequenceId = _sequenceId.Current;
+                    var newSequenceId = _sequenceId.FetchNext();
+                    sendPackage.Command.SequenceId = newSequenceId;
+                    sendPackage.Metadata.SequenceId = newSequenceId;
                 }
                 else
                     sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
@@ -99,9 +100,6 @@ namespace DotPulsar.Internal
                 var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
-                if (autoAssignSequenceId)
-                    _sequenceId.Increment();
-
                 return response.SendReceipt;
             }
             finally
@@ -115,3 +113,4 @@ namespace DotPulsar.Internal
         }
     }
 }
+
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c023c56..c855337 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@ namespace DotPulsar.Internal
         private const string ConnectResponseIdentifier = "Connected";
 
         private readonly Awaiter<string, BaseCommand> _responses;
-        private ulong _requestId;
+        private SequenceId _requestId;
 
         public RequestResponseHandler()
         {
             _responses = new Awaiter<string, BaseCommand>();
-            _requestId = 1;
+            _requestId = new SequenceId(1);
         }
 
         public void Dispose()
@@ -53,31 +53,31 @@ namespace DotPulsar.Internal
             switch (cmd.CommandType)
             {
                 case BaseCommand.Type.Seek:
-                    cmd.Seek.RequestId = _requestId++;
+                    cmd.Seek.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Lookup:
-                    cmd.LookupTopic.RequestId = _requestId++;
+                    cmd.LookupTopic.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Error:
-                    cmd.Error.RequestId = _requestId++;
+                    cmd.Error.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Producer:
-                    cmd.Producer.RequestId = _requestId++;
+                    cmd.Producer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.CloseProducer:
-                    cmd.CloseProducer.RequestId = _requestId++;
+                    cmd.CloseProducer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Subscribe:
-                    cmd.Subscribe.RequestId = _requestId++;
+                    cmd.Subscribe.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.Unsubscribe:
-                    cmd.Unsubscribe.RequestId = _requestId++;
+                    cmd.Unsubscribe.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.CloseConsumer:
-                    cmd.CloseConsumer.RequestId = _requestId++;
+                    cmd.CloseConsumer.RequestId = _requestId.FetchNext();
                     return;
                 case BaseCommand.Type.GetLastMessageId:
-                    cmd.GetLastMessageId.RequestId = _requestId++;
+                    cmd.GetLastMessageId.RequestId = _requestId.FetchNext();
                     return;
             }
         }
@@ -90,7 +90,7 @@ namespace DotPulsar.Internal
                 BaseCommand.Type.Send => $"{cmd.Send.ProducerId}-{cmd.Send.SequenceId}",
                 BaseCommand.Type.SendError => $"{cmd.SendError.ProducerId}-{cmd.SendError.SequenceId}",
                 BaseCommand.Type.SendReceipt => $"{cmd.SendReceipt.ProducerId}-{cmd.SendReceipt.SequenceId}",
-                BaseCommand.Type.Error => _requestId == 1 ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
+                BaseCommand.Type.Error => !_requestId.IsPastInitialId() ? ConnectResponseIdentifier : cmd.Error.RequestId.ToString(),
                 BaseCommand.Type.Producer => cmd.Producer.RequestId.ToString(),
                 BaseCommand.Type.ProducerSuccess => cmd.ProducerSuccess.RequestId.ToString(),
                 BaseCommand.Type.CloseProducer => cmd.CloseProducer.RequestId.ToString(),
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 5b41d7d..55ba94a 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,21 +12,32 @@
  * limitations under the License.
  */
 
+using System.Threading;
+
 namespace DotPulsar.Internal
 {
     public sealed class SequenceId
     {
+        private long _current;
+        private ulong _initial;
+
         public SequenceId(ulong initialSequenceId)
         {
-            Current = initialSequenceId;
-
-            if (initialSequenceId > 0)
-                Increment();
+            // Subtracting one because Interlocked.Increment will return the post-incremented value
+            // which is expected to be the initialSequenceId for the first call
+            _current = unchecked((long)initialSequenceId - 1);
+            _initial = initialSequenceId - 1;
         }
 
-        public ulong Current { get; private set; }
+        // Returns false if FetchNext has not been called on this object before (or if it somehow wrapped around 2^64)
+        public bool IsPastInitialId()
+        {
+            return unchecked((ulong)_current != _initial);
+        }
 
-        public void Increment()
-            => ++Current;
+        public ulong FetchNext()
+        {
+            return unchecked((ulong)Interlocked.Increment(ref _current));
+        }
     }
 }