You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/11/07 20:37:14 UTC

[qpid-proton-dotnet] branch main updated: PROTON-2649 Update sender sendable state on check to reflect state

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new ab43220  PROTON-2649 Update sender sendable state on check to reflect state
ab43220 is described below

commit ab43220acf92d14d3a41fb21e1f668edd7c4c43a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Mon Nov 7 15:35:39 2022 -0500

    PROTON-2649 Update sender sendable state on check to reflect state
    
    When a sender is asked if it is sendable update the state if the session
    capacity reports that it is not available while the sender link does
    still have credit.  This allows the engine to notifity all senders in
    the not sendable state that they have recovered.
---
 src/Proton/Engine/Implementation/ProtonSender.cs   |   2 +-
 .../Engine/Implementation/ProtonSessionTest.cs     | 184 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/src/Proton/Engine/Implementation/ProtonSender.cs b/src/Proton/Engine/Implementation/ProtonSender.cs
index 9e251b9..ecef356 100644
--- a/src/Proton/Engine/Implementation/ProtonSender.cs
+++ b/src/Proton/Engine/Implementation/ProtonSender.cs
@@ -54,7 +54,7 @@ namespace Apache.Qpid.Proton.Engine.Implementation
 
       public override uint Credit => CreditState.Credit;
 
-      public bool IsSendable => sendable && sessionWindow.IsSendable;
+      public bool IsSendable => sendable = sendable && sessionWindow.IsSendable;
 
       public override bool IsDraining => CreditState.IsDrain;
 
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
index 805814d..edac887 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonSessionTest.cs
@@ -2779,6 +2779,190 @@ namespace Apache.Qpid.Proton.Engine.Implementation
          Assert.IsNull(failure);
       }
 
+      [Test]
+      public void TestBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState()
+      {
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         Queue<Action> asyncIOCallbacks = new Queue<Action>();
+         ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+         byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+         IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+         peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+         peer.ExpectAttach().Respond();
+         peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+         peer.ExpectAttach().Respond();
+         peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+         IConnection connection = engine.Start();
+         connection.MaxFrameSize = 1024;
+         connection.Open();
+         ISession session = connection.Session();
+         session.OutgoingCapacity = 1024;
+         session.Open();
+         ISender sender1 = session.Sender("test1");
+         sender1.DeliveryTagGenerator = generator;
+         sender1.Open();
+         ISender sender2 = session.Sender("test2");
+         sender2.DeliveryTagGenerator = generator;
+         sender2.Open();
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         int sender1CreditStateUpdated = 0;
+         sender1.CreditStateUpdateHandler((self) =>
+         {
+            sender1CreditStateUpdated++;
+         });
+
+         int sender2CreditStateUpdated = 0;
+         sender2.CreditStateUpdateHandler((self) =>
+         {
+            sender2CreditStateUpdated++;
+         });
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+         // Open, Begin, Attach, Attach
+         Assert.AreEqual(4, asyncIOCallbacks.Count);
+         foreach (Action action in asyncIOCallbacks)
+         {
+            action.Invoke();
+         }
+         asyncIOCallbacks.Clear();
+
+         IOutgoingDelivery delivery = sender1.Next();
+         delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+         peer.WaitForScriptToComplete();
+
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+         // Free a frame's worth of window which should trigger both senders sendable update event
+         asyncIOCallbacks.Dequeue().Invoke();
+         Assert.AreEqual(0, asyncIOCallbacks.Count);
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender1CreditStateUpdated);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender2CreditStateUpdated);
+
+         peer.WaitForScriptToComplete();
+         Assert.IsNull(failure);
+      }
+
+      [Test]
+      public void TestSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow()
+      {
+         IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+         engine.ErrorHandler((error) => failure = error.FailureCause);
+         Queue<Action> asyncIOCallbacks = new Queue<Action>();
+         ProtonTestConnector peer = CreateTestPeer(engine, asyncIOCallbacks);
+
+         byte[] payload = new byte[] { 0, 1, 2, 3, 4 };
+         IDeliveryTagGenerator generator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+
+         peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+         peer.ExpectOpen().WithMaxFrameSize(1024).Respond();
+         peer.ExpectBegin().WithNextOutgoingId(0).Respond().WithNextOutgoingId(0);
+         peer.ExpectAttach().Respond();
+         peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+         peer.ExpectAttach().Respond();
+         peer.RemoteFlow().WithLinkCredit(20).WithNextIncomingId(0).WithIncomingWindow(8192).Queue();
+
+         IConnection connection = engine.Start();
+         connection.MaxFrameSize = 1024;
+         connection.Open();
+         ISession session = connection.Session();
+         session.OutgoingCapacity = 1024;
+         session.Open();
+         ISender sender1 = session.Sender("test1");
+         sender1.DeliveryTagGenerator = generator;
+         sender1.Open();
+         ISender sender2 = session.Sender("test2");
+         sender2.DeliveryTagGenerator = generator;
+         sender2.Open();
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         int sender1CreditStateUpdated = 0;
+         sender1.CreditStateUpdateHandler((self) =>
+         {
+            sender1CreditStateUpdated++;
+            if (self.IsSendable)
+            {
+               IOutgoingDelivery delivery = self.Next();
+               delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+            }
+         });
+
+         int sender2CreditStateUpdated = 0;
+         sender2.CreditStateUpdateHandler((self) =>
+         {
+            sender2CreditStateUpdated++;
+         });
+
+         Assert.IsTrue(sender1.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+         Assert.IsTrue(sender2.IsSendable);
+         Assert.AreEqual(1024, session.RemainingOutgoingCapacity);
+
+         // Open, Begin, Attach, Attach
+         Assert.AreEqual(4, asyncIOCallbacks.Count);
+         foreach (Action action in asyncIOCallbacks)
+         {
+            action.Invoke();
+         }
+         asyncIOCallbacks.Clear();
+
+         IOutgoingDelivery delivery = sender1.Next();
+         delivery.WriteBytes(ProtonByteBufferAllocator.Instance.Wrap(payload));
+
+         peer.WaitForScriptToComplete();
+         peer.ExpectTransfer().WithPayload(payload);
+
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+
+         // Should trigger sender 1 to send which should exhaust the outgoing credit
+         asyncIOCallbacks.Dequeue().Invoke();
+         Assert.AreEqual(1, asyncIOCallbacks.Count);
+
+         Assert.IsFalse(sender1.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         Assert.AreEqual(1, sender1CreditStateUpdated);
+         Assert.IsFalse(sender2.IsSendable);
+         Assert.AreEqual(0, session.RemainingOutgoingCapacity);
+         // Should not have triggered an event for sender 2 being able to send since
+         // sender one consumed the outgoing window already.
+         Assert.AreEqual(0, sender2CreditStateUpdated);
+
+         peer.WaitForScriptToComplete();
+         Assert.IsNull(failure);
+      }
+
       [Test]
       public void TestHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives()
       {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org