You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/01 00:01:58 UTC

svn commit: r1087456 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src: main/csharp/MessageConsumer.cs test/csharp/AMQRedeliveryPolicyTest.cs

Author: tabish
Date: Thu Mar 31 22:01:57 2011
New Revision: 1087456

URL: http://svn.apache.org/viewvc?rev=1087456&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-323

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs Thu Mar 31 22:01:57 2011
@@ -661,18 +661,7 @@ namespace Apache.NMS.ActiveMQ
 				MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
 				if(dispatch != null)
 				{
-					try
-					{
-						ActiveMQMessage message = CreateActiveMQMessage(dispatch);
-						BeforeMessageIsConsumed(dispatch);
-						listener(message);
-						AfterMessageIsConsumed(dispatch, false);
-					}
-					catch(NMSException e)
-					{
-						this.session.Connection.OnSessionException(this.session, e);
-					}
-
+					this.Dispatch(dispatch);
 					return true;
 				}
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Mar 31 22:01:57 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.ActiveMQ.Commands;
 using NUnit.Framework;
@@ -343,5 +344,104 @@ namespace Apache.NMS.ActiveMQ.Test
             }
         }
 
+        [Test]
+        public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.MaximumRedeliveries = -1;
+                policy.InitialRedeliveryDelay = 500;
+                policy.UseExponentialBackOff = false;
+
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+
+                IMessageProducer producer = session.CreateProducer(destination);
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                // Send the messages
+                ITextMessage textMessage = session.CreateTextMessage("1st");
+                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+                producer.Send(textMessage);
+                session.Commit();
+
+                ITextMessage m;
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+
+                // No delay on first Rollback..
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNotNull(m);
+                session.Rollback();
+
+                // Show subsequent re-delivery delay is incrementing.
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+
+                // The message gets redelivered after 500 ms every time since
+                // we are not using exponential backoff.
+                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNull(m);
+            }
+        }
+
+        [Test]
+        public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.MaximumRedeliveries = -1;
+                policy.InitialRedeliveryDelay = 500;
+                policy.UseExponentialBackOff = false;
+
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+
+                IMessageProducer producer = session.CreateProducer(destination);
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                CallbackClass cc = new CallbackClass(session);
+                consumer.Listener += new MessageListener(cc.consumer_Listener);
+
+                // Send the messages
+                ITextMessage textMessage = session.CreateTextMessage("1st");
+                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+                producer.Send(textMessage, MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0));
+                session.Commit();
+
+                // sends normal message, then immediate retry, then retry after 500 ms, then expire.
+                Thread.Sleep(2000);
+                Assert.AreEqual(3, cc.numReceived);
+            }
+        }
+
+        class CallbackClass
+        {
+            private ISession session;
+            public int numReceived = 0;
+
+            public CallbackClass(ISession session)
+            {
+                this.session = session;
+            }
+
+            public void consumer_Listener(IMessage message)
+            {
+                numReceived++;
+                ITextMessage m = message as ITextMessage;
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+            }
+        }
     }
 }