You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/01 00:01:58 UTC
svn commit: r1087456 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src:
main/csharp/MessageConsumer.cs test/csharp/AMQRedeliveryPolicyTest.cs
Author: tabish
Date: Thu Mar 31 22:01:57 2011
New Revision: 1087456
URL: http://svn.apache.org/viewvc?rev=1087456&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-323
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs Thu Mar 31 22:01:57 2011
@@ -661,18 +661,7 @@ namespace Apache.NMS.ActiveMQ
MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
if(dispatch != null)
{
- try
- {
- ActiveMQMessage message = CreateActiveMQMessage(dispatch);
- BeforeMessageIsConsumed(dispatch);
- listener(message);
- AfterMessageIsConsumed(dispatch, false);
- }
- catch(NMSException e)
- {
- this.session.Connection.OnSessionException(this.session, e);
- }
-
+ this.Dispatch(dispatch);
return true;
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Mar 31 22:01:57 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.ActiveMQ.Commands;
using NUnit.Framework;
@@ -343,5 +344,104 @@ namespace Apache.NMS.ActiveMQ.Test
}
}
+ [Test]
+ public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.MaximumRedeliveries = -1;
+ policy.InitialRedeliveryDelay = 500;
+ policy.UseExponentialBackOff = false;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ ITextMessage textMessage = session.CreateTextMessage("1st");
+ textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+ producer.Send(textMessage);
+ session.Commit();
+
+ ITextMessage m;
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // No delay on first Rollback..
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNotNull(m);
+ session.Rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // The message gets redelivered after 500 ms every time since
+ // we are not using exponential backoff.
+ m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNull(m);
+ }
+ }
+
+ [Test]
+ public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.MaximumRedeliveries = -1;
+ policy.InitialRedeliveryDelay = 500;
+ policy.UseExponentialBackOff = false;
+
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ CallbackClass cc = new CallbackClass(session);
+ consumer.Listener += new MessageListener(cc.consumer_Listener);
+
+ // Send the messages
+ ITextMessage textMessage = session.CreateTextMessage("1st");
+ textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+ producer.Send(textMessage, MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0));
+ session.Commit();
+
+ // sends normal message, then immediate retry, then retry after 500 ms, then expire.
+ Thread.Sleep(2000);
+ Assert.AreEqual(3, cc.numReceived);
+ }
+ }
+
+ class CallbackClass
+ {
+ private ISession session;
+ public int numReceived = 0;
+
+ public CallbackClass(ISession session)
+ {
+ this.session = session;
+ }
+
+ public void consumer_Listener(IMessage message)
+ {
+ numReceived++;
+ ITextMessage m = message as ITextMessage;
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+ }
+ }
}
}