You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/21 23:11:49 UTC
svn commit: r1619590 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src:
main/csharp/MessageConsumer.cs main/csharp/Threads/CompositeTaskRunner.cs
test/csharp/AMQRedeliveryPolicyTest.cs
Author: tabish
Date: Thu Aug 21 21:11:49 2014
New Revision: 1619590
URL: http://svn.apache.org/r1619590
Log:
https://issues.apache.org/jira/browse/AMQNET-489
Merge in fixes from AMQ-5146 to honor the redelivery policy on messages dispatched from the Broker.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Aug 21 21:11:49 2014
@@ -39,6 +39,8 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public class MessageConsumer : IMessageConsumer, IDispatcher
{
+ private const int NO_MAXIMUM_REDELIVERIES = -1;
+
private readonly MessageTransformation messageTransformation;
private readonly MessageDispatchChannel unconsumedMessages;
private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
@@ -789,7 +791,15 @@ namespace Apache.NMS.ActiveMQ
{
if(listener != null && this.unconsumedMessages.Running)
{
- dispatchMessage = true;
+ if (RedeliveryExceeded(dispatch))
+ {
+ PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+ return;
+ }
+ else
+ {
+ dispatchMessage = true;
+ }
}
else
{
@@ -1014,6 +1024,11 @@ namespace Apache.NMS.ActiveMQ
}
}
}
+ else if (RedeliveryExceeded(dispatch))
+ {
+ Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch);
+ PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+ }
else
{
return dispatch;
@@ -1231,7 +1246,7 @@ namespace Apache.NMS.ActiveMQ
}
// evaluate both expired and normal msgs as otherwise consumer may get stalled
- if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
+ if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
{
this.session.SendAck(pendingAck);
this.pendingAck = null;
@@ -1250,6 +1265,18 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.SyncRequest(ack);
}
+ private void PosionAck(MessageDispatch dispatch, string cause)
+ {
+ BrokerError poisonCause = new BrokerError();
+ poisonCause.ExceptionClass = "javax.jms.JMSException";
+ poisonCause.Message = cause;
+
+ MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
+ posionAck.FirstMessageId = dispatch.Message.MessageId;
+ posionAck.PoisonCause = poisonCause;
+ this.session.Connection.SyncRequest(posionAck);
+ }
+
private void RegisterSync()
{
// Don't acknowledge now, but we may need to let the broker know the
@@ -1387,14 +1414,19 @@ namespace Apache.NMS.ActiveMQ
this.info.ConsumerId, this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
}
+ BrokerError poisonCause = new BrokerError();
+ poisonCause.ExceptionClass = "javax.jms.JMSException";
+ poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;
+
if (lastMd.RollbackCause != null)
{
BrokerError cause = new BrokerError();
- cause.ExceptionClass = "javax.jms.JMSException";
- cause.Message = lastMd.RollbackCause.Message;
- ack.PoisonCause = cause;
+ poisonCause.ExceptionClass = "javax.jms.JMSException";
+ poisonCause.Message = lastMd.RollbackCause.Message;
+ poisonCause.Cause = cause;
}
ack.FirstMessageId = firstMsgId;
+ ack.PoisonCause = poisonCause;
this.session.SendAck(ack);
@@ -1743,6 +1775,33 @@ namespace Apache.NMS.ActiveMQ
}
}
+ private bool RedeliveryExceeded(MessageDispatch dispatch)
+ {
+ try
+ {
+ ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage;
+
+ Tracer.Debug("Checking if Redelivery count is exceeded.");
+ Tracer.DebugFormat("Current policy = {0}", RedeliveryPolicy.MaximumRedeliveries);
+ Tracer.DebugFormat("Message Redelivery Count = {0}", dispatch.RedeliveryCounter);
+ Tracer.DebugFormat("Is Transacted? {0}", session.IsTransacted);
+ Tracer.DebugFormat("Is Message from redelivery plugin? {0}", amqMessage.Properties.Contains("redeliveryDelay"));
+
+ bool result = session.IsTransacted && redeliveryPolicy != null &&
+ redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES &&
+ dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries &&
+ // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+ !amqMessage.Properties.Contains("redeliveryDelay");
+
+ Tracer.DebugFormat("Exceeded Redelivery Max? {0}", result);
+ return result;
+ }
+ catch (Exception ignored)
+ {
+ return false;
+ }
+ }
+
#endregion
#region Nested ISyncronization Types
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Thu Aug 21 21:11:49 2014
@@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading;
namespace Apache.NMS.ActiveMQ.Threads
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1619590&r1=1619589&r2=1619590&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Aug 21 21:11:49 2014
@@ -19,6 +19,7 @@ using System;
using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
@@ -27,6 +28,7 @@ namespace Apache.NMS.ActiveMQ.Test
public class AMQRedeliveryPolicyTest : NMSTestSupport
{
private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest";
+ private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
[Test]
public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
@@ -443,5 +445,161 @@ namespace Apache.NMS.ActiveMQ.Test
session.Rollback();
}
}
+
+ [Test]
+ public void TestRepeatedRedeliveryReceiveNoCommit()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ connection.Start();
+
+ ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit");
+ IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+ connection.DeleteDestination(destination);
+ connection.DeleteDestination(dlq);
+ IMessageProducer producer = dlqSession.CreateProducer(destination);
+ producer.Send(dlqSession.CreateTextMessage("1st"));
+ IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+ const int maxRedeliveries = 4;
+ for (int i = 0; i <= maxRedeliveries + 1; i++)
+ {
+ using(Connection loopConnection = (Connection) CreateConnection())
+ {
+ // Receive a message with the JMS API
+ IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 0;
+ policy.UseExponentialBackOff = false;
+ policy.MaximumRedeliveries = maxRedeliveries;
+
+ loopConnection.Start();
+ ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ ActiveMQTextMessage m = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ActiveMQTextMessage;
+ if (m != null)
+ {
+ Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter);
+ }
+
+ if (i <= maxRedeliveries)
+ {
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ Assert.AreEqual(i, m.RedeliveryCounter);
+ }
+ else
+ {
+ Assert.IsNull(m, "null on exceeding redelivery count");
+ }
+ }
+ }
+
+ // We should be able to get the message off the DLQ now.
+ ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+ Assert.IsNotNull(msg, "Got message from DLQ");
+ Assert.AreEqual("1st", msg.Text);
+ String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ if (cause != null)
+ {
+ Tracer.DebugFormat("Rollback Cause = {0}", cause);
+ Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref");
+ }
+ else
+ {
+ Tracer.Debug("DLQ'd message has no cause tag.");
+ }
+ }
+ }
+
+ [Test]
+ public void TestRepeatedRedeliveryOnMessageNoCommit()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ connection.Start();
+ ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit");
+ IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ");
+ connection.DeleteDestination(destination);
+ connection.DeleteDestination(dlq);
+ IMessageProducer producer = dlqSession.CreateProducer(destination);
+ IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq);
+
+ producer.Send(dlqSession.CreateTextMessage("1st"));
+
+ const int maxRedeliveries = 4;
+ Atomic<int> receivedCount = new Atomic<int>(0);
+
+ for (int i = 0; i <= maxRedeliveries + 1; i++)
+ {
+ using(Connection loopConnection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy;
+ policy.InitialRedeliveryDelay = 0;
+ policy.UseExponentialBackOff = false;
+ policy.MaximumRedeliveries = maxRedeliveries;
+
+ loopConnection.Start();
+ ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ OnMessageNoCommitCallback callback = new OnMessageNoCommitCallback(receivedCount);
+ consumer.Listener += new MessageListener(callback.consumer_Listener);
+
+ if (i <= maxRedeliveries)
+ {
+ Assert.IsTrue(callback.Await(), "listener should have dispatched a message");
+ }
+ else
+ {
+ // final redlivery gets poisoned before dispatch
+ Assert.IsFalse(callback.Await(), "listener should not have dispatched after max redliveries");
+ }
+ }
+ }
+
+ // We should be able to get the message off the DLQ now.
+ ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+ Assert.IsNotNull(msg, "Got message from DLQ");
+ Assert.AreEqual("1st", msg.Text);
+ String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ if (cause != null)
+ {
+ Tracer.DebugFormat("Rollback Cause = {0}", cause);
+ Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref");
+ }
+ else
+ {
+ Tracer.Debug("DLQ'd message has no cause tag.");
+ }
+ }
+ }
+
+ class OnMessageNoCommitCallback
+ {
+ private Atomic<int> receivedCount;
+ private CountDownLatch done = new CountDownLatch(1);
+
+ public OnMessageNoCommitCallback(Atomic<int> receivedCount)
+ {
+ this.receivedCount = receivedCount;
+ }
+
+ public bool Await()
+ {
+ return done.await(TimeSpan.FromMilliseconds(5000));
+ }
+
+ public void consumer_Listener(IMessage message)
+ {
+ ActiveMQTextMessage m = message as ActiveMQTextMessage;
+ Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter);
+ Assert.AreEqual("1st", m.Text);
+ Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter);
+ receivedCount.GetAndSet(receivedCount.Value + 1);
+ done.countDown();
+ }
+ }
}
}