You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/05/03 19:02:39 UTC

[qpid-proton-dotnet] branch main updated: PROTON-2722 Race during receiver queue count can break next receiver

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cbcf3a  PROTON-2722 Race during receiver queue count can break next receiver
2cbcf3a is described below

commit 2cbcf3a72f46445d4d50059547802d2cd93fe67c
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 3 14:43:08 2023 -0400

    PROTON-2722 Race during receiver queue count can break next receiver
    
    A race on the check of the queue backlog for receivers can break the next
    receiver API and can also throw an error from the public queue count API
    dual to concurrent updates. This fixes a very intermittent CI test failure
    and makes the public queued deliveries API stable against concurrent access.
---
 .../Implementation/ClientNextReceiverSelector.cs   | 14 ++++++------
 .../Implementation/ClientReceiverLinkType.cs       | 25 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
index e9dfff7..751f547 100644
--- a/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
+++ b/src/Proton.Client/Client/Implementation/ClientNextReceiverSelector.cs
@@ -142,7 +142,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectRandomReceiver()
       {
          IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
 
          Engine.IReceiver receiver = receivers.ElementAtOrDefault(random.Next(0, receivers.Count()));
 
@@ -163,7 +163,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
                {
                   if (foundLast)
                   {
-                     if (candidate.QueuedDeliveries > 0)
+                     if (candidate.GetQueuedDeliveries() > 0)
                      {
                         result = candidate;
                      }
@@ -187,7 +187,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       {
          Engine.IReceiver receiver =
             session.ProtonSession.Receivers.Where(
-               r => r.LinkedResource is ClientReceiver receiver && receiver.QueuedDeliveries > 0).FirstOrDefault();
+               r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0).FirstOrDefault();
 
          return (ClientReceiver)receiver?.LinkedResource;
       }
@@ -195,7 +195,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectLargestBacklog()
       {
          IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
 
          ClientReceiver result = null;
 
@@ -203,7 +203,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
 
-            if (result == null || result.QueuedDeliveries < candidate.QueuedDeliveries)
+            if (result == null || result.GetQueuedDeliveries() < candidate.GetQueuedDeliveries())
             {
                result = candidate;
             }
@@ -215,7 +215,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
       private ClientReceiver SelectSmallestBacklog()
       {
          IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
-            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.QueuedDeliveries > 0);
+            Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
 
          ClientReceiver result = null;
 
@@ -223,7 +223,7 @@ namespace Apache.Qpid.Proton.Client.Implementation
          {
             ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
 
-            if (result == null || result.QueuedDeliveries > candidate.QueuedDeliveries)
+            if (result == null || result.GetQueuedDeliveries() > candidate.GetQueuedDeliveries())
             {
                result = candidate;
             }
diff --git a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
index 72f41d0..9db0b19 100644
--- a/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
+++ b/src/Proton.Client/Client/Implementation/ClientReceiverLinkType.cs
@@ -49,7 +49,30 @@ namespace Apache.Qpid.Proton.Client.Implementation
          }
       }
 
-      public virtual int QueuedDeliveries => protonLink.Unsettled.Count(delivery => delivery.LinkedResource == null);
+      internal int GetQueuedDeliveries()
+      {
+         // Internal implementation operates on the current thread which should generally
+         // be the connection execution thread.
+         return protonLink.Unsettled.Count(delivery => delivery.LinkedResource == null);
+      }
+
+      public virtual int QueuedDeliveries
+      {
+         get
+         {
+            CheckClosedOrFailed();
+            TaskCompletionSource<int> queuedCount = new();
+            session.Execute(() =>
+            {
+               if (NotClosedOrFailed(queuedCount))
+               {
+                  _ = queuedCount.TrySetResult(GetQueuedDeliveries());
+               }
+            });
+
+            return queuedCount.Task.ConfigureAwait(false).GetAwaiter().GetResult();
+         }
+      }
 
       public LinkType AddCredit(uint credit)
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org