You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/08/13 00:59:21 UTC

svn commit: r1695620 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs test/csharp/ZeroPrefetchConsumerTest.cs

Author: tabish
Date: Wed Aug 12 22:59:21 2015
New Revision: 1695620

URL: http://svn.apache.org/r1695620
Log:
Ensure that a new pull command is sent when a message exceeds the redelivery max and is poisoned. 
Fixes [AMQNET-507]. (See https://issues.apache.org/jira/browse/AMQNET-507)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ZeroPrefetchConsumerTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1695620&r1=1695619&r2=1695620&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Aug 12 22:59:21 2015
@@ -1065,6 +1065,22 @@ namespace Apache.NMS.ActiveMQ
                                        ConsumerId, dispatch);
                     PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " +
                                         "policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+
+                    // Refresh the dispatch time
+                    dispatchTime = DateTime.Now;
+
+                    if(dispatchTime > deadline)
+                    {
+                        // Out of time.
+                        timeout = TimeSpan.Zero;
+                    }
+                    else
+                    {
+                        // Adjust the timeout to the remaining time.
+                        timeout = deadline - dispatchTime;
+                    }
+
+                    SendPullRequest((long) timeout.TotalMilliseconds);
                 }
                 else
                 {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ZeroPrefetchConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ZeroPrefetchConsumerTest.cs?rev=1695620&r1=1695619&r2=1695620&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ZeroPrefetchConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ZeroPrefetchConsumerTest.cs Wed Aug 12 22:59:21 2015
@@ -154,6 +154,54 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsNull(answer, "Should have not received a message!");
         }
 
+        [Test]
+        public void TestConsumerReceivePrefetchZeroRedeliveryZero()
+        {
+            const string QUEUE_NAME = "TEST.TestConsumerReceivePrefetchZeroRedeliveryZero";
+
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {              
+                session.DeleteDestination(queue);
+
+                using (IMessageProducer producer = session.CreateProducer(queue))
+                {
+                    ITextMessage textMessage = session.CreateTextMessage("test Message");
+                    producer.Send(textMessage);
+                }
+            }
+
+            // consume and rollback - increase redelivery counter on message
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            using (IMessageConsumer consumer = session.CreateConsumer(queue))
+            {              
+                connection.Start();
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.IsNotNull(message);
+                session.Rollback();
+            }
+
+            // try consume with timeout - expect it to timeout and return NULL message
+            using (Connection connection = CreateConnection() as Connection)
+            {
+                connection.PrefetchPolicy.All = 0;
+                connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+                connection.Start();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue(QUEUE_NAME);
+
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                    Assert.IsNull(message);
+                }
+            }
+        }
+
         [SetUp]
         public override void SetUp()
         {