You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/09/14 19:38:02 UTC

[qpid-proton-dotnet] branch main updated: PROTON-2611 Allow for configuration of a custom delivery tag generator

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git


The following commit(s) were added to refs/heads/main by this push:
     new a2ac93c  PROTON-2611 Allow for configuration of a custom delivery tag generator
a2ac93c is described below

commit a2ac93c36f7cbbbac00c38d084db62b76da269a2
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Sep 14 15:37:11 2022 -0400

    PROTON-2611 Allow for configuration of a custom delivery tag generator
    
    Allows the SenderOptions and StreamSenderOptions to convey a supplier to
    the sender which create a IDeliveryTagGenerator for each sender upon
    creation.
---
 .../Client/Implementation/ClientSenderBuilder.cs   |  21 +++-
 src/Proton.Client/Client/SenderOptions.cs          |  19 +++
 src/Proton.Client/Client/Utilities/Objects.cs      |   4 +-
 .../Client/Implementation/ClientSenderTest.cs      | 132 +++++++++++++++++++++
 .../Implementation/ClientStreamSenderTest.cs       | 131 ++++++++++++++++++++
 5 files changed, 302 insertions(+), 5 deletions(-)

diff --git a/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs b/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
index 0074dc3..aedaee1 100644
--- a/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
@@ -16,6 +16,8 @@
  */
 
 using Apache.Qpid.Proton.Client.Concurrent;
+using Apache.Qpid.Proton.Client.Utilities;
+using Apache.Qpid.Proton.Engine;
 using Apache.Qpid.Proton.Engine.Implementation;
 using Apache.Qpid.Proton.Types.Messaging;
 using Apache.Qpid.Proton.Types.Transactions;
@@ -124,14 +126,25 @@ namespace Apache.Qpid.Proton.Client.Implementation
          protonSender.Target = CreateTarget(address, options);
          protonSender.Source = CreateSource(senderId, options);
 
-         // Use a tag generator that will reuse old tags.  Later we might make this configurable.
-         if (protonSender.SenderSettleMode == SenderSettleMode.Settled)
+         IDeliveryTagGenerator tagGenerator = options.DeliveryTagGeneratorSupplier == null ? null :
+            Objects.RequireNonNull(options.DeliveryTagGeneratorSupplier(),
+               "Cannot assign a null tag generator from a custom supplier");
+
+         if (tagGenerator == null)
          {
-            protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Empty.NewTagGenerator();
+            // Use a tag generator that will reuse old tags.  Later we might make this configurable.
+            if (protonSender.SenderSettleMode == SenderSettleMode.Settled)
+            {
+               protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Empty.NewTagGenerator();
+            }
+            else
+            {
+               protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+            }
          }
          else
          {
-            protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+            protonSender.DeliveryTagGenerator = tagGenerator;
          }
 
          return protonSender;
diff --git a/src/Proton.Client/Client/SenderOptions.cs b/src/Proton.Client/Client/SenderOptions.cs
index 864cc35..91cb858 100644
--- a/src/Proton.Client/Client/SenderOptions.cs
+++ b/src/Proton.Client/Client/SenderOptions.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using Apache.Qpid.Proton.Engine;
 
 namespace Apache.Qpid.Proton.Client
 {
@@ -56,6 +57,8 @@ namespace Apache.Qpid.Proton.Client
          other.RequestTimeout = RequestTimeout;
          other.OpenTimeout = OpenTimeout;
          other.CloseTimeout = CloseTimeout;
+         other.DeliveryTagGeneratorSupplier = DeliveryTagGeneratorSupplier;
+
          if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
          {
             string[] copyOf = new string[OfferedCapabilities.Length];
@@ -149,5 +152,21 @@ namespace Apache.Qpid.Proton.Client
       /// </summary>
       public IDictionary<string, object> Properties { get; set; }
 
+      /// <summary>
+      /// Configures a supplier that provides the Delivery Tag Generator instance which the
+      /// Sender created using these options will use when setting the delivery tag on outgoing
+      /// deliveries.
+      /// </summary>
+      /// <remarks>
+      /// The client sender will use a default delivery tag generator unless a supplier is
+      /// configured here as such this options is not required to be used. In some cases a
+      /// client application may want to control exactly what form of delivery tag is used
+      /// on outgoing deliveries and this mechanism provides that control. The caller is
+      /// responsible for providing a supplier that will provide unique instance of a tag
+      /// generator for any sender created with this options instance as the tag generators
+      /// are not meant to be shared resources.
+      /// </remarks>
+      public Func<IDeliveryTagGenerator> DeliveryTagGeneratorSupplier { get; set; }
+
    }
 }
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Utilities/Objects.cs b/src/Proton.Client/Client/Utilities/Objects.cs
index de60349..ce05546 100644
--- a/src/Proton.Client/Client/Utilities/Objects.cs
+++ b/src/Proton.Client/Client/Utilities/Objects.cs
@@ -34,12 +34,14 @@ namespace Apache.Qpid.Proton.Client.Utilities
       /// <param name="value">The value to check for null</param>
       /// <param name="errorMessage">The message to supply when throwing an error</param>
       /// <exception cref="ArgumentNullException">If the given value is null</exception>
-      public static void RequireNonNull(object value, string errorMessage)
+      public static T RequireNonNull<T>(T value, string errorMessage)
       {
          if (value == null)
          {
             throw new ArgumentNullException(errorMessage ?? "Value provided cannot be null");
          }
+
+         return value;
       }
    }
 }
\ No newline at end of file
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
index 786f7e3..8a349ad 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
@@ -28,6 +28,8 @@ using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Messaging;
 using System.Threading.Tasks;
 using Apache.Qpid.Proton.Test.Driver.Codec.Messaging;
 using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Transport;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Types;
 
 namespace Apache.Qpid.Proton.Client.Implementation
 {
@@ -3125,5 +3127,135 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      private static IDeliveryTagGenerator CustomTagGenerator()
+      {
+         return new CustomDeliveryTagGenerator();
+      }
+
+      private static IDeliveryTagGenerator CustomNullTagGenerator()
+      {
+         return null;
+      }
+
+      private class CustomDeliveryTagGenerator : IDeliveryTagGenerator
+      {
+         private int count = 1;
+
+         IDeliveryTag IDeliveryTagGenerator.NextTag()
+         {
+            switch (count++)
+            {
+               case 1:
+                  return new DeliveryTag(new byte[] { 1, 1, 1 });
+               case 2:
+                  return new DeliveryTag(new byte[] { 2, 2, 2 });
+               case 3:
+                  return new DeliveryTag(new byte[] { 3, 3, 3 });
+               default:
+                  throw new InvalidOperationException("Only supports creating three tags");
+            }
+         }
+      }
+
+      [Test]
+      public void TestSenderUsesCustomDeliveryTagGeneratorConfiguration()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.RemoteFlow().WithLinkCredit(10).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+            ISession session = connection.OpenSession().OpenTask.Result;
+
+            SenderOptions options = new SenderOptions()
+            {
+               DeliveryMode = DeliveryMode.AtLeastOnce,
+               AutoSettle = true,
+               DeliveryTagGeneratorSupplier = CustomTagGenerator
+            };
+            ISender sender = session.OpenSender("test-tags", options).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 1, 1, 1 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 2, 2, 2 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 3, 3, 3 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectDetach().Respond();
+            peer.ExpectClose().Respond();
+
+            IMessage<string> message = IMessage<string>.Create("Hello World");
+            ITracker tracker1 = sender.Send(message);
+            ITracker tracker2 = sender.Send(message);
+            ITracker tracker3 = sender.Send(message);
+
+            Assert.IsNotNull(tracker1);
+            Assert.IsNotNull(tracker1.SettlementTask.Result);
+            Assert.IsNotNull(tracker2);
+            Assert.IsNotNull(tracker2.SettlementTask.Result);
+            Assert.IsNotNull(tracker3);
+            Assert.IsNotNull(tracker3.SettlementTask.Result);
+
+            sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestCannotCreateSenderWhenTagGeneratorReturnsNull()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectClose().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+            ISession session = connection.OpenSession().OpenTask.Result;
+            SenderOptions options = new SenderOptions()
+            {
+               DeliveryMode = DeliveryMode.AtLeastOnce,
+               AutoSettle = true,
+               DeliveryTagGeneratorSupplier = CustomNullTagGenerator
+            };
+
+            try
+            {
+               _ = session.OpenSender("test-tags", options).OpenTask.Result;
+               Assert.Fail("Should not create a sender if the tag generator is not supplied");
+            }
+            catch (ClientException)
+            {
+               // Expected
+            }
+
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
index cc4fa31..4054e68 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
@@ -29,6 +29,8 @@ using System.Linq;
 using System.IO;
 using System.Threading.Tasks;
 using System.Text;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Types;
 
 namespace Apache.Qpid.Proton.Client.Implementation
 {
@@ -3119,5 +3121,134 @@ namespace Apache.Qpid.Proton.Client.Implementation
             peer.WaitForScriptToComplete();
          }
       }
+
+      private static IDeliveryTagGenerator CustomTagGenerator()
+      {
+         return new CustomDeliveryTagGenerator();
+      }
+
+      private static IDeliveryTagGenerator CustomNullTagGenerator()
+      {
+         return null;
+      }
+
+      private class CustomDeliveryTagGenerator : IDeliveryTagGenerator
+      {
+         private int count = 1;
+
+         IDeliveryTag IDeliveryTagGenerator.NextTag()
+         {
+            switch (count++)
+            {
+               case 1:
+                  return new DeliveryTag(new byte[] { 1, 1, 1 });
+               case 2:
+                  return new DeliveryTag(new byte[] { 2, 2, 2 });
+               case 3:
+                  return new DeliveryTag(new byte[] { 3, 3, 3 });
+               default:
+                  throw new InvalidOperationException("Only supports creating three tags");
+            }
+         }
+      }
+
+      [Test]
+      public void TestSenderUsesCustomDeliveryTagGeneratorConfiguration()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectAttach().OfSender().Respond();
+            peer.RemoteFlow().WithLinkCredit(10).Queue();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+
+            StreamSenderOptions options = new StreamSenderOptions()
+            {
+               DeliveryMode = DeliveryMode.AtLeastOnce,
+               AutoSettle = true,
+               DeliveryTagGeneratorSupplier = CustomTagGenerator
+            };
+            IStreamSender sender = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
+
+            peer.WaitForScriptToComplete();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 1, 1, 1 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 2, 2, 2 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectTransfer().WithNonNullPayload()
+                                 .WithDeliveryTag(new byte[] { 3, 3, 3 }).Respond().WithSettled(true).WithState().Accepted();
+            peer.ExpectDetach().Respond();
+            peer.ExpectEnd().Respond();
+            peer.ExpectClose().Respond();
+
+            IMessage<string> message = IMessage<string>.Create("Hello World");
+            IStreamTracker tracker1 = sender.Send(message);
+            IStreamTracker tracker2 = sender.Send(message);
+            IStreamTracker tracker3 = sender.Send(message);
+
+            Assert.IsNotNull(tracker1);
+            Assert.IsNotNull(tracker1.SettlementTask.Result);
+            Assert.IsNotNull(tracker2);
+            Assert.IsNotNull(tracker2.SettlementTask.Result);
+            Assert.IsNotNull(tracker3);
+            Assert.IsNotNull(tracker3.SettlementTask.Result);
+
+            sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
+
+      [Test]
+      public void TestCannotCreateSenderWhenTagGeneratorReturnsNull()
+      {
+         using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+         {
+            peer.ExpectSASLAnonymousConnect();
+            peer.ExpectOpen().Respond();
+            peer.ExpectBegin().Respond();
+            peer.ExpectClose().Respond();
+            peer.Start();
+
+            string remoteAddress = peer.ServerAddress;
+            int remotePort = peer.ServerPort;
+
+            logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+            IClient container = IClient.Create();
+            IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+            StreamSenderOptions options = new StreamSenderOptions()
+            {
+               DeliveryMode = DeliveryMode.AtLeastOnce,
+               AutoSettle = true,
+               DeliveryTagGeneratorSupplier = CustomNullTagGenerator
+            };
+
+            try
+            {
+               _ = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
+               Assert.Fail("Should not create a sender if the tag generator is not supplied");
+            }
+            catch (ClientException)
+            {
+               // Expected
+            }
+
+            connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+            peer.WaitForScriptToComplete();
+         }
+      }
    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org