You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/08/13 08:06:16 UTC
[activemq-nms-amqp] branch master updated: AMQNET-596: Added local
message expiry connection uri connection property
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/master by this push:
new d579e06 AMQNET-596: Added local message expiry connection uri connection property
new 0e336f4 Merge pull request #15 from cjwmorgan-sol/localMessageExpiry
d579e06 is described below
commit d579e062f500a409333e7af14bfe9ecbc37fe3b2
Author: cmorgan-sol <Ch...@solace.com>
AuthorDate: Fri Aug 9 19:55:46 2019 -0400
AMQNET-596: Added local message expiry connection uri connection property
message consumer only filter expired messages when local message expiry is enabled
added message expiration integration test
Signed-off-by: cjwmorgan-sol <Ch...@solace.com>
Changed property name to LocalMessageExpiry to make the uri query parameter property match qpid jms
Signed-off-by: cjwmorgan-sol <Ch...@solace.com>
---
src/NMS.AMQP/Meta/ConnectionInfo.cs | 2 +
src/NMS.AMQP/Meta/ConsumerInfo.cs | 1 +
src/NMS.AMQP/NmsConnectionFactory.cs | 8 +-
src/NMS.AMQP/NmsMessageConsumer.cs | 9 +-
.../MessageExpirationIntegrationTest.cs | 175 +++++++++++++++++++++
test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 12 +-
.../TestAmqp/TestMessageSource.cs | 7 +-
7 files changed, 205 insertions(+), 9 deletions(-)
diff --git a/src/NMS.AMQP/Meta/ConnectionInfo.cs b/src/NMS.AMQP/Meta/ConnectionInfo.cs
index 5105c76..4d73322 100644
--- a/src/NMS.AMQP/Meta/ConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/ConnectionInfo.cs
@@ -70,6 +70,8 @@ namespace Apache.NMS.AMQP.Meta
public ushort channelMax { get; set; } = DEFAULT_CHANNEL_MAX;
public int maxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
+ public bool LocalMessageExpiry { get; set; }
+
public string TopicPrefix { get; internal set; } = null;
public string QueuePrefix { get; internal set; } = null;
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs b/src/NMS.AMQP/Meta/ConsumerInfo.cs
index 2e8afcb..95acbe2 100644
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/ConsumerInfo.cs
@@ -47,6 +47,7 @@ namespace Apache.NMS.AMQP.Meta
public bool HasSelector => !string.IsNullOrEmpty(Selector);
public bool IsDurable { get; set; }
public bool IsBrowser { get; set; }
+ public bool LocalMessageExpiry { get; set; }
public IDestination Destination { get; set; }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 265eaff..ecb238a 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -99,6 +99,11 @@ namespace Apache.NMS.AMQP
}
}
+ /// <summary>
+ /// Enables local message expiry for all MessageConsumers under the connection. Default is true.
+ /// </summary>
+ public bool LocalMessageExpiry { get; set; } = true;
+
/// <summary>
/// User name value used to authenticate the connection
/// </summary>
@@ -217,7 +222,8 @@ namespace Apache.NMS.AMQP
password = password,
remoteHost = BrokerUri,
requestTimeout = RequestTimeout,
- SendTimeout = SendTimeout
+ SendTimeout = SendTimeout,
+ LocalMessageExpiry = LocalMessageExpiry
};
bool userSpecifiedClientId = ClientId != null;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 4ffbfb5..9a93a4d 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -48,14 +48,15 @@ namespace Apache.NMS.AMQP
if (destination.IsTemporary)
{
session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination) destination);
- }
-
+ }
+
Info = new ConsumerInfo(consumerId, Session.SessionInfo.Id)
{
Destination = destination,
Selector = selector,
NoLocal = noLocal,
- SubscriptionName = name
+ SubscriptionName = name,
+ LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
};
deliveryTask = new MessageDeliveryTask(this);
@@ -265,7 +266,7 @@ namespace Apache.NMS.AMQP
private bool IsMessageExpired(InboundMessageDispatch envelope)
{
NmsMessage message = envelope.Message;
- return message.IsExpired();
+ return Info.LocalMessageExpiry && message.IsExpired();
}
private bool IsRedeliveryExceeded(InboundMessageDispatch envelope)
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs
new file mode 100755
index 0000000..2e193ab
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs
@@ -0,0 +1,175 @@
+using System;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Provider.Amqp.Message;
+using Moq;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+using Test.Amqp;
+
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class MessageExpirationIntegrationTest
+ {
+ private static readonly string User = "USER";
+ private static readonly string Password = "PASSWORD";
+ private static readonly string Address = "amqp://127.0.0.1:5672";
+ private static readonly IPEndPoint IPEndPoint = new IPEndPoint(IPAddress.Any, 5672);
+
+ [Test, Timeout(4000)]
+ public void TestIncomingExpiredMessageGetsFiltered()
+ {
+ const long ttl = 200;
+ TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+ {
+ DateTime createTime = DateTime.UtcNow - time;
+ Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+ string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+ Assert.NotNull(contents, "Failed to create expired message");
+ testPeer.Open();
+ testPeer.SendMessage("myQueue", expiredMsg);
+
+ NmsConnection connection = (NmsConnection)EstablishConnection(testPeer);
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ // TODO change to verify frames sent from client to be a Modified OutCome
+ IMessage message = consumer.Receive(time);
+ Assert.IsNull(message, "A message should not have been received");
+
+ session.Close();
+ connection.Close();
+ }
+ }
+
+ [Test, Timeout(4000)]
+ public void TestIncomingExpiredMessageGetsConsumedWhenDisabled()
+ {
+ const long ttl = 200;
+ TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+ {
+ DateTime createTime = DateTime.UtcNow - time;
+ Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+ string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+ Assert.NotNull(contents, "Failed to create expired message");
+ testPeer.Open();
+ testPeer.SendMessage("myQueue", expiredMsg);
+
+ NmsConnection connection = (NmsConnection)EstablishConnection(testPeer, "nms.localMessageExpiry=false");
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ IMessage message = consumer.Receive();
+ // TODO change to verify frames sent from client to be an Accepted OutCome
+ Assert.NotNull(message, "A message should have been received");
+ Assert.NotNull(message as ITextMessage, "Received incorrect message body type {0}", message.GetType().Name);
+ Assert.AreEqual(contents, (message as ITextMessage)?.Text, "Received message with unexpected body value");
+
+ session.Close();
+ connection.Close();
+ }
+ }
+
+ [Test, Timeout(4000)]
+ public void TestIncomingExpiredMessageGetsFilteredAsync()
+ {
+ const long ttl = 200;
+ TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+ {
+ DateTime createTime = DateTime.UtcNow - time;
+ Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+ string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+ Assert.NotNull(contents, "Failed to create expired message");
+ testPeer.Open();
+ testPeer.SendMessage("myQueue", expiredMsg);
+
+ NmsConnection connection = (NmsConnection)EstablishConnection(testPeer);
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ TaskCompletionSource<IMessage> tcs = new TaskCompletionSource<IMessage>();
+ consumer.Listener += m => { tcs.SetResult(m); };
+ connection.Start();
+ // TODO change to verify frames sent from client to be a Modified OutCome
+ Assert.AreEqual(1, Task.WaitAny(tcs.Task, Task.Delay(Convert.ToInt32(ttl))), "Received message when message should not have been received");
+ Assert.IsTrue(tcs.TrySetCanceled(), "Failed to cancel receive task");
+
+ session.Close();
+ connection.Close();
+ }
+ }
+
+ [Test, Timeout(4000)]
+ public void TestIncomingExpiredMessageGetsConsumedWhenDisabledAsync()
+ {
+ const long ttl = 200;
+ TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+ using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+ {
+ DateTime createTime = DateTime.UtcNow - time;
+ Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+ string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+ Assert.NotNull(contents, "Failed to create expired message");
+ testPeer.Open();
+ testPeer.SendMessage("myQueue", expiredMsg);
+
+ NmsConnection connection = (NmsConnection)EstablishConnection(testPeer, "nms.localMessageExpiry=false");
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+ TaskCompletionSource<IMessage> tcs = new TaskCompletionSource<IMessage>();
+ consumer.Listener += m => { tcs.SetResult(m); };
+ connection.Start();
+ // TODO change to verify frames sent from client to be an Accepted OutCome
+ Assert.AreEqual(0, Task.WaitAny(tcs.Task, Task.Delay(Convert.ToInt32(ttl))), "Did not receive message when message should have been received");
+ IMessage message = tcs.Task.Result;
+ Assert.NotNull(message, "A message should have been received");
+ Assert.NotNull(message as ITextMessage, "Received incorrect message body type {0}", message.GetType().Name);
+ Assert.AreEqual(contents, (message as ITextMessage)?.Text, "Received message with unexpected body value");
+
+ session.Close();
+ connection.Close();
+ }
+ }
+
+ private static Amqp.Message CreateMessageWithExpiration(long ttl, DateTime? createTime = null, string payload = null)
+ {
+ AmqpNmsTextMessageFacade msg = new AmqpNmsTextMessageFacade();
+ msg.Initialize(null);
+ msg.NMSTimestamp = createTime ?? DateTime.UtcNow;
+ if (ttl > 0)
+ {
+ TimeSpan timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(ttl));
+ msg.NMSTimeToLive = timeToLive;
+ msg.Expiration = msg.NMSTimestamp + timeToLive;
+ }
+ if (String.IsNullOrEmpty(payload))
+ {
+ payload = TestContext.CurrentContext.Test.FullName;
+ }
+ msg.Text = payload;
+ msg.NMSMessageId = Guid.NewGuid().ToString();
+ return msg.Message;
+ }
+
+ private static IConnection EstablishConnection(TestAmqpPeer peer, string queryParams = null)
+ {
+ string uri = String.IsNullOrEmpty(queryParams) ? peer.Address.OriginalString : $"{peer.Address.OriginalString}?{queryParams}";
+ NmsConnectionFactory factory = new NmsConnectionFactory(uri);
+ return factory.CreateConnection(User, Password);
+ }
+ }
+}
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 8bb5503..5e3b2c8 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -58,16 +58,22 @@ namespace NMS.AMQP.Test.TestAmqp
messageSources.Add(address, messageSource);
}
- public void SendMessage(string address, string payload)
+ public void SendMessage(string address, string payload)
+ {
+ Amqp.Message message = new Amqp.Message(payload) { Properties = new Amqp.Framing.Properties { MessageId = Guid.NewGuid().ToString() } };
+ SendMessage(address, message);
+ }
+
+ public void SendMessage(string address, Amqp.Message message)
{
if (messageSources.TryGetValue(address, out var messageSource))
{
- messageSource.SendMessage(payload);
+ messageSource.SendMessage(message);
}
else
{
messageSource = new TestMessageSource();
- messageSource.SendMessage(payload);
+ messageSource.SendMessage(message);
containerHost.RegisterMessageSource(address, messageSource);
messageSources.Add(address, messageSource);
}
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
index 1fab8cf..209c468 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
@@ -61,10 +61,15 @@ namespace NMS.AMQP.Test.TestAmqp
public void SendMessage(string payload)
{
Amqp.Message message = new Amqp.Message(payload) { Properties = new Properties { MessageId = Guid.NewGuid().ToString() } };
+ SendMessage(message);
+ }
+
+ public void SendMessage(Amqp.Message message)
+ {
lock (messages)
{
this.messages.Enqueue(message);
- }
+ }
}
public void DisposeMessage(ReceiveContext receiveContext, DispositionContext dispositionContext)