You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/09/14 19:38:02 UTC
[qpid-proton-dotnet] branch main updated: PROTON-2611 Allow for configuration of a custom delivery tag generator
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton-dotnet.git
The following commit(s) were added to refs/heads/main by this push:
new a2ac93c PROTON-2611 Allow for configuration of a custom delivery tag generator
a2ac93c is described below
commit a2ac93c36f7cbbbac00c38d084db62b76da269a2
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Sep 14 15:37:11 2022 -0400
PROTON-2611 Allow for configuration of a custom delivery tag generator
Allows the SenderOptions and StreamSenderOptions to convey a supplier to
the sender which create a IDeliveryTagGenerator for each sender upon
creation.
---
.../Client/Implementation/ClientSenderBuilder.cs | 21 +++-
src/Proton.Client/Client/SenderOptions.cs | 19 +++
src/Proton.Client/Client/Utilities/Objects.cs | 4 +-
.../Client/Implementation/ClientSenderTest.cs | 132 +++++++++++++++++++++
.../Implementation/ClientStreamSenderTest.cs | 131 ++++++++++++++++++++
5 files changed, 302 insertions(+), 5 deletions(-)
diff --git a/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs b/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
index 0074dc3..aedaee1 100644
--- a/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
+++ b/src/Proton.Client/Client/Implementation/ClientSenderBuilder.cs
@@ -16,6 +16,8 @@
*/
using Apache.Qpid.Proton.Client.Concurrent;
+using Apache.Qpid.Proton.Client.Utilities;
+using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Engine.Implementation;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Types.Transactions;
@@ -124,14 +126,25 @@ namespace Apache.Qpid.Proton.Client.Implementation
protonSender.Target = CreateTarget(address, options);
protonSender.Source = CreateSource(senderId, options);
- // Use a tag generator that will reuse old tags. Later we might make this configurable.
- if (protonSender.SenderSettleMode == SenderSettleMode.Settled)
+ IDeliveryTagGenerator tagGenerator = options.DeliveryTagGeneratorSupplier == null ? null :
+ Objects.RequireNonNull(options.DeliveryTagGeneratorSupplier(),
+ "Cannot assign a null tag generator from a custom supplier");
+
+ if (tagGenerator == null)
{
- protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Empty.NewTagGenerator();
+ // Use a tag generator that will reuse old tags. Later we might make this configurable.
+ if (protonSender.SenderSettleMode == SenderSettleMode.Settled)
+ {
+ protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Empty.NewTagGenerator();
+ }
+ else
+ {
+ protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+ }
}
else
{
- protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
+ protonSender.DeliveryTagGenerator = tagGenerator;
}
return protonSender;
diff --git a/src/Proton.Client/Client/SenderOptions.cs b/src/Proton.Client/Client/SenderOptions.cs
index 864cc35..91cb858 100644
--- a/src/Proton.Client/Client/SenderOptions.cs
+++ b/src/Proton.Client/Client/SenderOptions.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using Apache.Qpid.Proton.Engine;
namespace Apache.Qpid.Proton.Client
{
@@ -56,6 +57,8 @@ namespace Apache.Qpid.Proton.Client
other.RequestTimeout = RequestTimeout;
other.OpenTimeout = OpenTimeout;
other.CloseTimeout = CloseTimeout;
+ other.DeliveryTagGeneratorSupplier = DeliveryTagGeneratorSupplier;
+
if (OfferedCapabilities != null && OfferedCapabilities.Length > 0)
{
string[] copyOf = new string[OfferedCapabilities.Length];
@@ -149,5 +152,21 @@ namespace Apache.Qpid.Proton.Client
/// </summary>
public IDictionary<string, object> Properties { get; set; }
+ /// <summary>
+ /// Configures a supplier that provides the Delivery Tag Generator instance which the
+ /// Sender created using these options will use when setting the delivery tag on outgoing
+ /// deliveries.
+ /// </summary>
+ /// <remarks>
+ /// The client sender will use a default delivery tag generator unless a supplier is
+ /// configured here as such this options is not required to be used. In some cases a
+ /// client application may want to control exactly what form of delivery tag is used
+ /// on outgoing deliveries and this mechanism provides that control. The caller is
+ /// responsible for providing a supplier that will provide unique instance of a tag
+ /// generator for any sender created with this options instance as the tag generators
+ /// are not meant to be shared resources.
+ /// </remarks>
+ public Func<IDeliveryTagGenerator> DeliveryTagGeneratorSupplier { get; set; }
+
}
}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Utilities/Objects.cs b/src/Proton.Client/Client/Utilities/Objects.cs
index de60349..ce05546 100644
--- a/src/Proton.Client/Client/Utilities/Objects.cs
+++ b/src/Proton.Client/Client/Utilities/Objects.cs
@@ -34,12 +34,14 @@ namespace Apache.Qpid.Proton.Client.Utilities
/// <param name="value">The value to check for null</param>
/// <param name="errorMessage">The message to supply when throwing an error</param>
/// <exception cref="ArgumentNullException">If the given value is null</exception>
- public static void RequireNonNull(object value, string errorMessage)
+ public static T RequireNonNull<T>(T value, string errorMessage)
{
if (value == null)
{
throw new ArgumentNullException(errorMessage ?? "Value provided cannot be null");
}
+
+ return value;
}
}
}
\ No newline at end of file
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
index 786f7e3..8a349ad 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
@@ -28,6 +28,8 @@ using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Messaging;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Test.Driver.Codec.Messaging;
using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Transport;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Types;
namespace Apache.Qpid.Proton.Client.Implementation
{
@@ -3125,5 +3127,135 @@ namespace Apache.Qpid.Proton.Client.Implementation
peer.WaitForScriptToComplete();
}
}
+
+ private static IDeliveryTagGenerator CustomTagGenerator()
+ {
+ return new CustomDeliveryTagGenerator();
+ }
+
+ private static IDeliveryTagGenerator CustomNullTagGenerator()
+ {
+ return null;
+ }
+
+ private class CustomDeliveryTagGenerator : IDeliveryTagGenerator
+ {
+ private int count = 1;
+
+ IDeliveryTag IDeliveryTagGenerator.NextTag()
+ {
+ switch (count++)
+ {
+ case 1:
+ return new DeliveryTag(new byte[] { 1, 1, 1 });
+ case 2:
+ return new DeliveryTag(new byte[] { 2, 2, 2 });
+ case 3:
+ return new DeliveryTag(new byte[] { 3, 3, 3 });
+ default:
+ throw new InvalidOperationException("Only supports creating three tags");
+ }
+ }
+ }
+
+ [Test]
+ public void TestSenderUsesCustomDeliveryTagGeneratorConfiguration()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(10).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ SenderOptions options = new SenderOptions()
+ {
+ DeliveryMode = DeliveryMode.AtLeastOnce,
+ AutoSettle = true,
+ DeliveryTagGeneratorSupplier = CustomTagGenerator
+ };
+ ISender sender = session.OpenSender("test-tags", options).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 1, 1, 1 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 2, 2, 2 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 3, 3, 3 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectDetach().Respond();
+ peer.ExpectClose().Respond();
+
+ IMessage<string> message = IMessage<string>.Create("Hello World");
+ ITracker tracker1 = sender.Send(message);
+ ITracker tracker2 = sender.Send(message);
+ ITracker tracker3 = sender.Send(message);
+
+ Assert.IsNotNull(tracker1);
+ Assert.IsNotNull(tracker1.SettlementTask.Result);
+ Assert.IsNotNull(tracker2);
+ Assert.IsNotNull(tracker2.SettlementTask.Result);
+ Assert.IsNotNull(tracker3);
+ Assert.IsNotNull(tracker3.SettlementTask.Result);
+
+ sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+ connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestCannotCreateSenderWhenTagGeneratorReturnsNull()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+ ISession session = connection.OpenSession().OpenTask.Result;
+ SenderOptions options = new SenderOptions()
+ {
+ DeliveryMode = DeliveryMode.AtLeastOnce,
+ AutoSettle = true,
+ DeliveryTagGeneratorSupplier = CustomNullTagGenerator
+ };
+
+ try
+ {
+ _ = session.OpenSender("test-tags", options).OpenTask.Result;
+ Assert.Fail("Should not create a sender if the tag generator is not supplied");
+ }
+ catch (ClientException)
+ {
+ // Expected
+ }
+
+ connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+ peer.WaitForScriptToComplete();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
index cc4fa31..4054e68 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientStreamSenderTest.cs
@@ -29,6 +29,8 @@ using System.Linq;
using System.IO;
using System.Threading.Tasks;
using System.Text;
+using Apache.Qpid.Proton.Engine;
+using Apache.Qpid.Proton.Types;
namespace Apache.Qpid.Proton.Client.Implementation
{
@@ -3119,5 +3121,134 @@ namespace Apache.Qpid.Proton.Client.Implementation
peer.WaitForScriptToComplete();
}
}
+
+ private static IDeliveryTagGenerator CustomTagGenerator()
+ {
+ return new CustomDeliveryTagGenerator();
+ }
+
+ private static IDeliveryTagGenerator CustomNullTagGenerator()
+ {
+ return null;
+ }
+
+ private class CustomDeliveryTagGenerator : IDeliveryTagGenerator
+ {
+ private int count = 1;
+
+ IDeliveryTag IDeliveryTagGenerator.NextTag()
+ {
+ switch (count++)
+ {
+ case 1:
+ return new DeliveryTag(new byte[] { 1, 1, 1 });
+ case 2:
+ return new DeliveryTag(new byte[] { 2, 2, 2 });
+ case 3:
+ return new DeliveryTag(new byte[] { 3, 3, 3 });
+ default:
+ throw new InvalidOperationException("Only supports creating three tags");
+ }
+ }
+ }
+
+ [Test]
+ public void TestSenderUsesCustomDeliveryTagGeneratorConfiguration()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(10).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+
+ StreamSenderOptions options = new StreamSenderOptions()
+ {
+ DeliveryMode = DeliveryMode.AtLeastOnce,
+ AutoSettle = true,
+ DeliveryTagGeneratorSupplier = CustomTagGenerator
+ };
+ IStreamSender sender = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 1, 1, 1 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 2, 2, 2 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectTransfer().WithNonNullPayload()
+ .WithDeliveryTag(new byte[] { 3, 3, 3 }).Respond().WithSettled(true).WithState().Accepted();
+ peer.ExpectDetach().Respond();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+
+ IMessage<string> message = IMessage<string>.Create("Hello World");
+ IStreamTracker tracker1 = sender.Send(message);
+ IStreamTracker tracker2 = sender.Send(message);
+ IStreamTracker tracker3 = sender.Send(message);
+
+ Assert.IsNotNull(tracker1);
+ Assert.IsNotNull(tracker1.SettlementTask.Result);
+ Assert.IsNotNull(tracker2);
+ Assert.IsNotNull(tracker2.SettlementTask.Result);
+ Assert.IsNotNull(tracker3);
+ Assert.IsNotNull(tracker3.SettlementTask.Result);
+
+ sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+ connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestCannotCreateSenderWhenTagGeneratorReturnsNull()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
+ StreamSenderOptions options = new StreamSenderOptions()
+ {
+ DeliveryMode = DeliveryMode.AtLeastOnce,
+ AutoSettle = true,
+ DeliveryTagGeneratorSupplier = CustomNullTagGenerator
+ };
+
+ try
+ {
+ _ = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
+ Assert.Fail("Should not create a sender if the tag generator is not supplied");
+ }
+ catch (ClientException)
+ {
+ // Expected
+ }
+
+ connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
+
+ peer.WaitForScriptToComplete();
+ }
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org