You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/08/13 08:06:16 UTC

[activemq-nms-amqp] branch master updated: AMQNET-596: Added local message expiry connection uri connection property

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/master by this push:
     new d579e06  AMQNET-596: Added local message expiry connection uri connection property
     new 0e336f4  Merge pull request #15 from cjwmorgan-sol/localMessageExpiry
d579e06 is described below

commit d579e062f500a409333e7af14bfe9ecbc37fe3b2
Author: cmorgan-sol <Ch...@solace.com>
AuthorDate: Fri Aug 9 19:55:46 2019 -0400

    AMQNET-596: Added local message expiry connection uri connection property
    
    message consumer only filter expired messages when local message expiry is enabled
    
    added message expiration integration test
    
    Signed-off-by: cjwmorgan-sol <Ch...@solace.com>
    
    Changed property name to LocalMessageExpiry to make the uri query parameter property match qpid jms
    
    Signed-off-by: cjwmorgan-sol <Ch...@solace.com>
---
 src/NMS.AMQP/Meta/ConnectionInfo.cs                |   2 +
 src/NMS.AMQP/Meta/ConsumerInfo.cs                  |   1 +
 src/NMS.AMQP/NmsConnectionFactory.cs               |   8 +-
 src/NMS.AMQP/NmsMessageConsumer.cs                 |   9 +-
 .../MessageExpirationIntegrationTest.cs            | 175 +++++++++++++++++++++
 test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs |  12 +-
 .../TestAmqp/TestMessageSource.cs                  |   7 +-
 7 files changed, 205 insertions(+), 9 deletions(-)

diff --git a/src/NMS.AMQP/Meta/ConnectionInfo.cs b/src/NMS.AMQP/Meta/ConnectionInfo.cs
index 5105c76..4d73322 100644
--- a/src/NMS.AMQP/Meta/ConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/ConnectionInfo.cs
@@ -70,6 +70,8 @@ namespace Apache.NMS.AMQP.Meta
         public ushort channelMax { get; set; } = DEFAULT_CHANNEL_MAX;
         public int maxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
 
+        public bool LocalMessageExpiry { get; set; }
+
         public string TopicPrefix { get; internal set; } = null;
 
         public string QueuePrefix { get; internal set; } = null;
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs b/src/NMS.AMQP/Meta/ConsumerInfo.cs
index 2e8afcb..95acbe2 100644
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/ConsumerInfo.cs
@@ -47,6 +47,7 @@ namespace Apache.NMS.AMQP.Meta
         public bool HasSelector => !string.IsNullOrEmpty(Selector);
         public bool IsDurable { get; set; }
         public bool IsBrowser { get; set; }
+        public bool LocalMessageExpiry { get; set; }
         public IDestination Destination { get; set; }
     }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index 265eaff..ecb238a 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -99,6 +99,11 @@ namespace Apache.NMS.AMQP
             }
         }
 
+        /// <summary>
+        /// Enables local message expiry for all MessageConsumers under the connection. Default is true.
+        /// </summary>
+        public bool LocalMessageExpiry { get; set; } = true;
+
         /// <summary>
         /// User name value used to authenticate the connection
         /// </summary>
@@ -217,7 +222,8 @@ namespace Apache.NMS.AMQP
                 password = password,
                 remoteHost = BrokerUri,
                 requestTimeout = RequestTimeout,
-                SendTimeout = SendTimeout
+                SendTimeout = SendTimeout,
+                LocalMessageExpiry = LocalMessageExpiry
             };
 
             bool userSpecifiedClientId = ClientId != null;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index 4ffbfb5..9a93a4d 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -48,14 +48,15 @@ namespace Apache.NMS.AMQP
             if (destination.IsTemporary)
             {
                 session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination) destination);
-            }
-            
+            }
+
             Info = new ConsumerInfo(consumerId, Session.SessionInfo.Id)
             {
                 Destination = destination,
                 Selector = selector,
                 NoLocal = noLocal,
-                SubscriptionName = name
+                SubscriptionName = name,
+                LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
             };
             deliveryTask = new MessageDeliveryTask(this);
 
@@ -265,7 +266,7 @@ namespace Apache.NMS.AMQP
         private bool IsMessageExpired(InboundMessageDispatch envelope)
         {
             NmsMessage message = envelope.Message;
-            return message.IsExpired();
+            return Info.LocalMessageExpiry && message.IsExpired();
         }
 
         private bool IsRedeliveryExceeded(InboundMessageDispatch envelope)
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs
new file mode 100755
index 0000000..2e193ab
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageExpirationIntegrationTest.cs
@@ -0,0 +1,175 @@
+using System;
+using System.Linq;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Provider.Amqp.Message;
+using Moq;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+using Test.Amqp;
+
+
+namespace NMS.AMQP.Test.Integration
+{
+    [TestFixture]
+    public class MessageExpirationIntegrationTest
+    {
+        private static readonly string User = "USER";
+        private static readonly string Password = "PASSWORD";
+        private static readonly string Address = "amqp://127.0.0.1:5672";
+        private static readonly IPEndPoint IPEndPoint = new IPEndPoint(IPAddress.Any, 5672);
+
+        [Test, Timeout(4000)]
+        public void TestIncomingExpiredMessageGetsFiltered()
+        {
+            const long ttl = 200;
+            TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+            {
+                DateTime createTime = DateTime.UtcNow - time;
+                Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+                string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+                Assert.NotNull(contents, "Failed to create expired message");
+                testPeer.Open();
+                testPeer.SendMessage("myQueue", expiredMsg);
+
+                NmsConnection connection = (NmsConnection)EstablishConnection(testPeer);
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                // TODO change to verify frames sent from client to be a Modified OutCome
+                IMessage message = consumer.Receive(time);
+                Assert.IsNull(message, "A message should not have been received");
+
+                session.Close();
+                connection.Close();
+            }
+        }
+
+        [Test, Timeout(4000)]
+        public void TestIncomingExpiredMessageGetsConsumedWhenDisabled()
+        {
+            const long ttl = 200;
+            TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+            {
+                DateTime createTime = DateTime.UtcNow - time;
+                Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+                string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+                Assert.NotNull(contents, "Failed to create expired message");
+                testPeer.Open();
+                testPeer.SendMessage("myQueue", expiredMsg);
+
+                NmsConnection connection = (NmsConnection)EstablishConnection(testPeer, "nms.localMessageExpiry=false");
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                IMessage message = consumer.Receive();
+                // TODO change to verify frames sent from client to be an Accepted OutCome
+                Assert.NotNull(message, "A message should have been received");
+                Assert.NotNull(message as ITextMessage, "Received incorrect message body type {0}", message.GetType().Name);
+                Assert.AreEqual(contents, (message as ITextMessage)?.Text, "Received message with unexpected body value");
+
+                session.Close();
+                connection.Close();
+            }
+        }
+
+        [Test, Timeout(4000)]
+        public void TestIncomingExpiredMessageGetsFilteredAsync()
+        {
+            const long ttl = 200;
+            TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+            {
+                DateTime createTime = DateTime.UtcNow - time;
+                Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+                string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+                Assert.NotNull(contents, "Failed to create expired message");
+                testPeer.Open();
+                testPeer.SendMessage("myQueue", expiredMsg);
+
+                NmsConnection connection = (NmsConnection)EstablishConnection(testPeer);
+                
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                TaskCompletionSource<IMessage> tcs = new TaskCompletionSource<IMessage>();
+                consumer.Listener += m => { tcs.SetResult(m); };
+                connection.Start();
+                // TODO change to verify frames sent from client to be a Modified OutCome
+                Assert.AreEqual(1, Task.WaitAny(tcs.Task, Task.Delay(Convert.ToInt32(ttl))), "Received message when message should not have been received");
+                Assert.IsTrue(tcs.TrySetCanceled(), "Failed to cancel receive task");
+                
+                session.Close();
+                connection.Close();
+            }
+        }
+
+        [Test, Timeout(4000)]
+        public void TestIncomingExpiredMessageGetsConsumedWhenDisabledAsync()
+        {
+            const long ttl = 200;
+            TimeSpan time = TimeSpan.FromMilliseconds(ttl + 1);
+            using (TestAmqpPeer testPeer = new TestAmqpPeer(Address, User, Password))
+            {
+                DateTime createTime = DateTime.UtcNow - time;
+                Amqp.Message expiredMsg = CreateMessageWithExpiration(ttl, createTime);
+                string contents = (expiredMsg.BodySection as Amqp.Framing.AmqpValue)?.Value as string;
+                Assert.NotNull(contents, "Failed to create expired message");
+                testPeer.Open();
+                testPeer.SendMessage("myQueue", expiredMsg);
+
+                NmsConnection connection = (NmsConnection)EstablishConnection(testPeer, "nms.localMessageExpiry=false");
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                TaskCompletionSource<IMessage> tcs = new TaskCompletionSource<IMessage>();
+                consumer.Listener += m => { tcs.SetResult(m); };
+                connection.Start();
+                // TODO change to verify frames sent from client to be an Accepted OutCome
+                Assert.AreEqual(0, Task.WaitAny(tcs.Task, Task.Delay(Convert.ToInt32(ttl))), "Did not receive message when message should have been received");
+                IMessage message = tcs.Task.Result;
+                Assert.NotNull(message, "A message should have been received");
+                Assert.NotNull(message as ITextMessage, "Received incorrect message body type {0}", message.GetType().Name);
+                Assert.AreEqual(contents, (message as ITextMessage)?.Text, "Received message with unexpected body value");
+
+                session.Close();
+                connection.Close();
+            }
+        }
+
+        private static Amqp.Message CreateMessageWithExpiration(long ttl, DateTime? createTime = null, string payload = null)
+        {
+            AmqpNmsTextMessageFacade msg = new AmqpNmsTextMessageFacade();
+            msg.Initialize(null);
+            msg.NMSTimestamp = createTime ?? DateTime.UtcNow;
+            if (ttl > 0)
+            {
+                TimeSpan timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(ttl));
+                msg.NMSTimeToLive = timeToLive;
+                msg.Expiration = msg.NMSTimestamp + timeToLive;
+            }
+            if (String.IsNullOrEmpty(payload))
+            {
+                payload = TestContext.CurrentContext.Test.FullName;
+            }
+            msg.Text = payload;
+            msg.NMSMessageId = Guid.NewGuid().ToString();
+            return msg.Message;
+        }
+
+        private static IConnection EstablishConnection(TestAmqpPeer peer, string queryParams = null)
+        {
+            string uri = String.IsNullOrEmpty(queryParams) ? peer.Address.OriginalString : $"{peer.Address.OriginalString}?{queryParams}";
+            NmsConnectionFactory factory = new NmsConnectionFactory(uri);
+            return factory.CreateConnection(User, Password);
+        }
+    }
+}
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 8bb5503..5e3b2c8 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -58,16 +58,22 @@ namespace NMS.AMQP.Test.TestAmqp
             messageSources.Add(address, messageSource);
         }
 
-        public void SendMessage(string address, string payload)
+        public void SendMessage(string address, string payload)
+        {
+            Amqp.Message message = new Amqp.Message(payload) { Properties = new Amqp.Framing.Properties { MessageId = Guid.NewGuid().ToString() } };
+            SendMessage(address, message);
+        }
+
+        public void SendMessage(string address, Amqp.Message message)
         {
             if (messageSources.TryGetValue(address, out var messageSource))
             {
-                messageSource.SendMessage(payload);
+                messageSource.SendMessage(message);
             }
             else
             {
                 messageSource = new TestMessageSource();
-                messageSource.SendMessage(payload);
+                messageSource.SendMessage(message);
                 containerHost.RegisterMessageSource(address, messageSource);
                 messageSources.Add(address, messageSource);
             }
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
index 1fab8cf..209c468 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestMessageSource.cs
@@ -61,10 +61,15 @@ namespace NMS.AMQP.Test.TestAmqp
         public void SendMessage(string payload)
         {
             Amqp.Message message = new Amqp.Message(payload) { Properties = new Properties { MessageId = Guid.NewGuid().ToString() } };
+            SendMessage(message);
+        }
+
+        public void SendMessage(Amqp.Message message)
+        {
             lock (messages)
             {
                 this.messages.Enqueue(message);
-            }
+            }
         }
 
         public void DisposeMessage(ReceiveContext receiveContext, DispositionContext dispositionContext)