You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/21 23:11:49 UTC

svn commit: r1619590 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs main/csharp/Threads/CompositeTaskRunner.cs test/csharp/AMQRedeliveryPolicyTest.cs

Author: tabish
Date: Thu Aug 21 21:11:49 2014
New Revision: 1619590

URL: http://svn.apache.org/r1619590
Log:
https://issues.apache.org/jira/browse/AMQNET-489

Merge in fixes from AMQ-5146 to honor the redelivery policy on messages dispatched from the Broker.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Aug 21 21:11:49 2014
@@ -39,6 +39,8 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
+        private const int NO_MAXIMUM_REDELIVERIES = -1;
+
         private readonly MessageTransformation messageTransformation;
         private readonly MessageDispatchChannel unconsumedMessages;
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
@@ -789,7 +791,15 @@ namespace Apache.NMS.ActiveMQ
 						{
 							if(listener != null && this.unconsumedMessages.Running)
 							{
-								dispatchMessage = true;
+                                if (RedeliveryExceeded(dispatch)) 
+                                {
+                                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+                                    return;
+                                } 
+                                else
+                                {
+								    dispatchMessage = true;
+                                }
 							}
 							else
 							{
@@ -1014,6 +1024,11 @@ namespace Apache.NMS.ActiveMQ
 						}
 					}
 				}
+                else if (RedeliveryExceeded(dispatch))
+                {
+                    Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch);
+                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+                }
 				else
 				{
 					return dispatch;
@@ -1231,7 +1246,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 
 	        // evaluate both expired and normal msgs as otherwise consumer may get stalled
-			if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
+			if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
 			{
 				this.session.SendAck(pendingAck);
 				this.pendingAck = null;
@@ -1250,6 +1265,18 @@ namespace Apache.NMS.ActiveMQ
 	        this.session.Connection.SyncRequest(ack);
 	    }
 
+        private void PosionAck(MessageDispatch dispatch, string cause)
+        {
+            BrokerError poisonCause = new BrokerError();
+            poisonCause.ExceptionClass = "javax.jms.JMSException";
+            poisonCause.Message = cause;
+
+            MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
+            posionAck.FirstMessageId = dispatch.Message.MessageId;
+            posionAck.PoisonCause = poisonCause;
+            this.session.Connection.SyncRequest(posionAck);
+        }
+
 	    private void RegisterSync()
 		{
 			// Don't acknowledge now, but we may need to let the broker know the
@@ -1387,14 +1414,19 @@ namespace Apache.NMS.ActiveMQ
                                                this.info.ConsumerId, this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
                         }
 
+                        BrokerError poisonCause = new BrokerError();
+                        poisonCause.ExceptionClass = "javax.jms.JMSException";
+                        poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;
+
 						if (lastMd.RollbackCause != null)
 						{
 							BrokerError cause = new BrokerError();
-							cause.ExceptionClass = "javax.jms.JMSException";
-							cause.Message = lastMd.RollbackCause.Message;
-							ack.PoisonCause = cause;
+                            poisonCause.ExceptionClass = "javax.jms.JMSException";
+                            poisonCause.Message = lastMd.RollbackCause.Message;
+                            poisonCause.Cause = cause;
 						}
                     	ack.FirstMessageId = firstMsgId;
+                        ack.PoisonCause = poisonCause;
 
 						this.session.SendAck(ack);
 
@@ -1743,6 +1775,33 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+        private bool RedeliveryExceeded(MessageDispatch dispatch) 
+        {
+            try 
+            {
+                ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage;
+
+                Tracer.Debug("Checking if Redelivery count is exceeded.");
+                Tracer.DebugFormat("Current policy = {0}", RedeliveryPolicy.MaximumRedeliveries);
+                Tracer.DebugFormat("Message Redelivery Count = {0}", dispatch.RedeliveryCounter);
+                Tracer.DebugFormat("Is Transacted? {0}", session.IsTransacted);
+                Tracer.DebugFormat("Is Message from redelivery plugin? {0}", amqMessage.Properties.Contains("redeliveryDelay"));
+
+                bool result = session.IsTransacted && redeliveryPolicy != null &&
+                       redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES &&
+                       dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries &&
+                       // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+                       !amqMessage.Properties.Contains("redeliveryDelay");
+
+                Tracer.DebugFormat("Exceeded Redelivery Max? {0}", result);
+                return result;
+            }
+            catch (Exception ignored) 
+            {
+                return false;
+            }
+        }
+
 		#endregion
 
 		#region Nested ISyncronization Types

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Thu Aug 21 21:11:49 2014
@@ -17,7 +17,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Threading;
 
 namespace Apache.NMS.ActiveMQ.Threads

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Aug 21 21:11:49 2014
@@ -19,6 +19,7 @@ using System;
 using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test
@@ -27,6 +28,7 @@ namespace Apache.NMS.ActiveMQ.Test
     public class AMQRedeliveryPolicyTest : NMSTestSupport
     {
         private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest";
+        private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
 
         [Test]
         public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
@@ -443,5 +445,161 @@ namespace Apache.NMS.ActiveMQ.Test
                 session.Rollback();
             }
         }
+
+        [Test]
+        public void TestRepeatedRedeliveryReceiveNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+
+                ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = dlqSession.CreateProducer(destination);
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                const int maxRedeliveries = 4;
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) CreateConnection())
+                    {
+                        // Receive a message with the JMS API
+                        IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                        ActiveMQTextMessage m = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ActiveMQTextMessage;
+                        if (m != null) 
+                        {
+                            Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter);
+                        }
+
+                        if (i <= maxRedeliveries)
+                        {
+                            Assert.IsNotNull(m);
+                            Assert.AreEqual("1st", m.Text);
+                            Assert.AreEqual(i, m.RedeliveryCounter);
+                        } 
+                        else
+                        {
+                            Assert.IsNull(m, "null on exceeding redelivery count");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        [Test]
+        public void TestRepeatedRedeliveryOnMessageNoCommit() 
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                connection.Start();
+                ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit");
+                IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+                connection.DeleteDestination(destination);
+                connection.DeleteDestination(dlq);
+                IMessageProducer producer = dlqSession.CreateProducer(destination);
+                IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+                producer.Send(dlqSession.CreateTextMessage("1st"));
+
+                const int maxRedeliveries = 4;
+                Atomic<int> receivedCount = new Atomic<int>(0);
+
+                for (int i = 0; i <= maxRedeliveries + 1; i++) 
+                {
+                    using(Connection loopConnection = (Connection) CreateConnection())
+                    {
+                        IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+                        policy.InitialRedeliveryDelay = 0;
+                        policy.UseExponentialBackOff = false;
+                        policy.MaximumRedeliveries = maxRedeliveries;
+
+                        loopConnection.Start();
+                        ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+                        IMessageConsumer consumer = session.CreateConsumer(destination);
+                        OnMessageNoCommitCallback callback = new OnMessageNoCommitCallback(receivedCount);
+                        consumer.Listener += new MessageListener(callback.consumer_Listener);
+
+                        if (i <= maxRedeliveries) 
+                        {
+                            Assert.IsTrue(callback.Await(), "listener should have dispatched a message");
+                        } 
+                        else 
+                        {
+                            // final redlivery gets poisoned before dispatch
+                            Assert.IsFalse(callback.Await(), "listener should not have dispatched after max redliveries");
+                        }
+                    }
+                }
+
+                // We should be able to get the message off the DLQ now.
+                ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+                Assert.IsNotNull(msg, "Got message from DLQ");
+                Assert.AreEqual("1st", msg.Text);
+                String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+                if (cause != null) 
+                {
+                    Tracer.DebugFormat("Rollback Cause = {0}", cause);
+                    Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref");
+                }
+                else
+                {
+                    Tracer.Debug("DLQ'd message has no cause tag.");
+                }
+            }
+        }
+
+        class OnMessageNoCommitCallback
+        {
+            private Atomic<int> receivedCount;
+            private CountDownLatch done = new CountDownLatch(1);
+
+            public OnMessageNoCommitCallback(Atomic<int> receivedCount)
+            {
+                this.receivedCount = receivedCount;
+            }
+
+            public bool Await() 
+            {
+                return done.await(TimeSpan.FromMilliseconds(5000));
+            }
+
+            public void consumer_Listener(IMessage message)
+            {
+                ActiveMQTextMessage m = message as ActiveMQTextMessage;
+                Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter);
+                Assert.AreEqual("1st", m.Text);
+                Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter);
+                receivedCount.GetAndSet(receivedCount.Value + 1);
+                done.countDown();
+            }
+        }
     }
 }