You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/20 00:44:29 UTC
svn commit: r1470081 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src:
main/csharp/MessageConsumer.cs main/csharp/Session.cs
test/csharp/OptimizedAckTest.cs
Author: tabish
Date: Fri Apr 19 22:44:28 2013
New Revision: 1470081
URL: http://svn.apache.org/r1470081
Log:
fixes for:
https://issues.apache.org/jira/browse/AMQNET-431
https://issues.apache.org/jira/browse/AMQNET-430
https://issues.apache.org/jira/browse/AMQNET-429
https://issues.apache.org/jira/browse/AMQNET-428
https://issues.apache.org/jira/browse/AMQNET-329
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1470081&r1=1470080&r2=1470081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr 19 22:44:28 2013
@@ -20,6 +20,7 @@ using System.Collections.Generic;
using System.Collections.Specialized;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
@@ -52,6 +53,7 @@ namespace Apache.NMS.ActiveMQ
private int redeliveryTimeout = 500;
protected bool disposed = false;
private long lastDeliveredSequenceId = 0;
+ private int ackCounter = 0;
private int deliveredCounter = 0;
private int additionalWindowSize = 0;
private long redeliveryDelay = 0;
@@ -59,12 +61,21 @@ namespace Apache.NMS.ActiveMQ
private volatile bool synchronizationRegistered = false;
private bool clearDispatchList = false;
private bool inProgressClearRequiredFlag;
+ private bool optimizeAcknowledge;
+ private DateTime optimizeAckTimestamp = DateTime.Now;
+ private long optimizeAcknowledgeTimeOut = 0;
+ private long optimizedAckScheduledAckInterval = 0;
+ private Timer optimizedAckTimer;
+ private long failoverRedeliveryWaitPeriod = 0;
+ private bool transactedIndividualAck = false;
+ private bool nonBlockingRedelivery = false;
private Exception failureError;
private event MessageListener listener;
private IRedeliveryPolicy redeliveryPolicy;
+ private PreviouslyDeliveredMap previouslyDeliveredMessages;
// Constructor internal to prevent clients from creating an instance.
internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
@@ -140,6 +151,19 @@ namespace Apache.NMS.ActiveMQ
URISupport.SetProperties(this.info, options);
URISupport.SetProperties(this, customConsumerOptions, "nms.");
}
+
+ this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge &&
+ session.IsAutoAcknowledge && !this.info.Browser;
+
+ if (this.optimizeAcknowledge) {
+ this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
+ OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
+ }
+
+ this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
+ this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
+ this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
+ this.transactedIndividualAck = session.Connection.TransactedIndividualAck || this.nonBlockingRedelivery;
}
~MessageConsumer()
@@ -200,6 +224,74 @@ namespace Apache.NMS.ActiveMQ
set { this.failureError = value; }
}
+ public bool OptimizeAcknowledge
+ {
+ get { return this.optimizeAcknowledge; }
+ set
+ {
+ if (optimizeAcknowledge && !value)
+ {
+ DeliverAcks();
+ }
+ this.optimizeAcknowledge = value;
+ }
+ }
+
+ public long OptimizeAcknowledgeTimeOut
+ {
+ get { return this.optimizeAcknowledgeTimeOut; }
+ set { this.optimizeAcknowledgeTimeOut = value; }
+ }
+
+ public long OptimizedAckScheduledAckInterval
+ {
+ get { return this.optimizedAckScheduledAckInterval; }
+ set
+ {
+ this.optimizedAckScheduledAckInterval = value;
+
+ if (this.optimizedAckTimer != null)
+ {
+ AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+ this.optimizedAckTimer.Dispose(shutdownEvent);
+ if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(5000), false))
+ {
+ Tracer.WarnFormat("Consumer[{0}]: Optimized Ack Timer Task didn't shutdown properly.", this.info.ConsumerId);
+ }
+
+ this.optimizedAckTimer = null;
+ }
+
+ // Should we periodically send out all outstanding acks.
+ if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0)
+ {
+ this.optimizedAckTimer = new Timer(
+ new TimerCallback(DoOptimizedAck),
+ null,
+ optimizedAckScheduledAckInterval,
+ optimizedAckScheduledAckInterval);
+ }
+ }
+ }
+
+ public long FailoverRedeliveryWaitPeriod
+ {
+ get { return this.failoverRedeliveryWaitPeriod; }
+ set { this.failoverRedeliveryWaitPeriod = value; }
+ }
+
+ public bool TransactedIndividualAck
+ {
+ get { return this.transactedIndividualAck; }
+ set { this.transactedIndividualAck = value; }
+ }
+
+ public bool NonBlockingRedelivery
+ {
+ get { return this.nonBlockingRedelivery; }
+ set { this.nonBlockingRedelivery = value; }
+ }
+
#endregion
#region IMessageConsumer Members
@@ -408,6 +500,29 @@ namespace Apache.NMS.ActiveMQ
}
}
+ if (this.optimizedAckTimer != null)
+ {
+ this.OptimizedAckScheduledAckInterval = 0;
+ }
+
+ if (this.session.IsClientAcknowledge)
+ {
+ if (!this.info.Browser)
+ {
+ // rollback duplicates that aren't acknowledged
+ LinkedList<MessageDispatch> temp = null;
+ lock(this.dispatchedMessages)
+ {
+ temp = new LinkedList<MessageDispatch>(this.dispatchedMessages);
+ }
+ foreach (MessageDispatch old in temp)
+ {
+ this.session.Connection.RollbackDuplicate(this, old.Message);
+ }
+ temp.Clear();
+ }
+ }
+
if(!this.session.IsTransacted)
{
lock(this.dispatchedMessages)
@@ -419,6 +534,15 @@ namespace Apache.NMS.ActiveMQ
this.session.RemoveConsumer(this);
this.unconsumedMessages.Close();
+ MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
+ if (!this.info.Browser)
+ {
+ foreach (MessageDispatch old in unconsumed)
+ {
+ // ensure we don't filter this as a duplicate
+ session.Connection.RollbackDuplicate(this, old.Message);
+ }
+ }
if(Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
@@ -470,14 +594,7 @@ namespace Apache.NMS.ActiveMQ
return;
}
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) AckType.IndividualAck;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = dispatch.Destination;
- ack.LastMessageId = dispatch.Message.MessageId;
- ack.MessageCount = 1;
-
+ MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
this.session.SendAck(ack);
}
@@ -511,39 +628,6 @@ namespace Apache.NMS.ActiveMQ
this.unconsumedMessages.Stop();
}
- internal void InProgressClearRequired()
- {
- inProgressClearRequiredFlag = true;
- // deal with delivered messages async to avoid lock contention with in progress acks
- clearDispatchList = true;
- }
-
- internal void ClearMessagesInProgress()
- {
- if(inProgressClearRequiredFlag)
- {
- // Called from a thread in the ThreadPool, so we wait until we can
- // get a lock on the unconsumed list then we clear it.
- lock(this.unconsumedMessages)
- {
- if(inProgressClearRequiredFlag)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
- this.unconsumedMessages.Count + ") on transport interrupt");
- }
-
- this.unconsumedMessages.Clear();
-
- // allow dispatch on this connection to resume
- this.session.Connection.TransportInterruptionProcessingComplete();
- this.inProgressClearRequiredFlag = false;
- }
- }
- }
- }
-
public void DeliverAcks()
{
MessageAck ack = null;
@@ -554,11 +638,12 @@ namespace Apache.NMS.ActiveMQ
{
lock(this.dispatchedMessages)
{
- ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+ ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
if(ack != null)
{
Tracer.Debug("Consumer - DeliverAcks clearing the Dispatch list");
this.dispatchedMessages.Clear();
+ this.ackCounter = 0;
}
else
{
@@ -575,16 +660,7 @@ namespace Apache.NMS.ActiveMQ
if(ack != null)
{
- MessageAck ackToSend = ack;
-
- try
- {
- this.session.SendAck(ackToSend);
- }
- catch(Exception e)
- {
- Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
- }
+ this.session.SendAck(ack);
}
else
{
@@ -593,42 +669,175 @@ namespace Apache.NMS.ActiveMQ
}
}
- public virtual void Dispatch(MessageDispatch dispatch)
+ internal void InProgressClearRequired()
{
- MessageListener listener = this.listener;
- bool dispatchMessage = false;
+ inProgressClearRequiredFlag = true;
+ // deal with delivered messages async to avoid lock contention with in progress acks
+ clearDispatchList = true;
+ }
- try
+ internal void ClearMessagesInProgress()
+ {
+ if(inProgressClearRequiredFlag)
{
+ // Called from a thread in the ThreadPool, so we wait until we can
+ // get a lock on the unconsumed list then we clear it.
lock(this.unconsumedMessages.SyncRoot)
{
- if(this.clearDispatchList)
+ if(inProgressClearRequiredFlag)
{
- // we are reconnecting so lets flush the in progress messages
- this.clearDispatchList = false;
- this.unconsumedMessages.Clear();
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+ this.unconsumedMessages.Count + ") on transport interrupt");
+ }
- if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+ // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
+ MessageDispatch[] list = this.unconsumedMessages.RemoveAll();
+ if (!this.info.Browser)
{
- // on resumption a pending delivered ack will be out of sync with
- // re-deliveries.
- if(Tracer.IsDebugEnabled)
+ foreach (MessageDispatch old in list)
{
- Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
- }
- this.pendingAck = null;
- }
+ session.Connection.RollbackDuplicate(this, old.Message);
+ }
+ }
+
+ // allow dispatch on this connection to resume
+ this.session.Connection.TransportInterruptionProcessingComplete();
+ this.inProgressClearRequiredFlag = false;
}
+ }
+ }
+ }
+
+ private void ClearDispatchList()
+ {
+ if (this.clearDispatchList)
+ {
+ lock(this.dispatchedMessages)
+ {
+ if (this.clearDispatchList)
+ {
+ if (dispatchedMessages.Count != 0)
+ {
+ if (session.IsTransacted)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
+ this.info.ConsumerId, dispatchedMessages.Count);
+ }
+ if (previouslyDeliveredMessages == null)
+ {
+ previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
+ }
+ foreach (MessageDispatch delivered in dispatchedMessages)
+ {
+ this.previouslyDeliveredMessages.Add(delivered.Message.MessageId, false);
+ }
+ }
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
+ this.info.ConsumerId, dispatchedMessages.Count);
+ }
+ this.dispatchedMessages.Clear();
+ this.pendingAck = null;
+ }
+ }
+ this.clearDispatchList = false;
+ }
+ }
+ }
+ }
+
+ public virtual void Dispatch(MessageDispatch dispatch)
+ {
+ MessageListener listener = this.listener;
+ bool dispatchMessage = false;
+ try
+ {
+ ClearMessagesInProgress();
+ ClearDispatchList();
+
+ lock(this.unconsumedMessages.SyncRoot)
+ {
if(!this.unconsumedMessages.Closed)
{
- if(listener != null && this.unconsumedMessages.Running)
+ if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
{
- dispatchMessage = true;
+ if(listener != null && this.unconsumedMessages.Running)
+ {
+ dispatchMessage = true;
+ }
+ else
+ {
+ if (!this.unconsumedMessages.Running)
+ {
+ // delayed redelivery, ensure it can be re delivered
+ session.Connection.RollbackDuplicate(this, dispatch.Message);
+ }
+ this.unconsumedMessages.Enqueue(dispatch);
+ }
}
- else
+ else
{
- this.unconsumedMessages.Enqueue(dispatch);
+ if (!this.session.IsTransacted)
+ {
+ Tracer.Warn("Duplicate dispatch on connection: " + session.Connection.ConnectionId +
+ " to consumer: " + ConsumerId + ", ignoring (auto acking) duplicate: " + dispatch);
+ MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+ session.SendAck(ack);
+ }
+ else
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Consumer[{0}]: tracking transacted redelivery of duplicate: {1}",
+ this.info.ConsumerId, dispatch.Message);
+ }
+ bool needsPoisonAck = false;
+ lock(this.dispatchedMessages)
+ {
+ if (previouslyDeliveredMessages != null)
+ {
+ previouslyDeliveredMessages.Add(dispatch.Message.MessageId, true);
+ }
+ else
+ {
+ // delivery while pending redelivery to another consumer on the same connection
+ // not waiting for redelivery will help here
+ needsPoisonAck = true;
+ }
+ }
+ if (needsPoisonAck)
+ {
+ MessageAck poisonAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
+ poisonAck.FirstMessageId = dispatch.Message.MessageId;
+ BrokerError cause = new BrokerError();
+ cause.ExceptionClass = "javax.jms.JMSException";
+ cause.Message = "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " +
+ session.Connection.ConnectionId;
+ Tracer.Warn("Acking duplicate delivery as poison, redelivery must be pending to another" +
+ " consumer on this connection, failoverRedeliveryWaitPeriod=" +
+ failoverRedeliveryWaitPeriod + ". Message: " + dispatch + ", poisonAck: " + poisonAck);
+ this.session.SendAck(poisonAck);
+ }
+ else
+ {
+ if (transactedIndividualAck)
+ {
+ ImmediateIndividualTransactedAck(dispatch);
+ }
+ else
+ {
+ this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1));
+ }
+ }
+ }
}
}
}
@@ -654,7 +863,8 @@ namespace Apache.NMS.ActiveMQ
{
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
{
- // Redeliver the message
+ // Schedule redelivery and possible dlq processing
+ dispatch.RollbackCause = e;
Rollback();
}
else
@@ -826,11 +1036,40 @@ namespace Apache.NMS.ActiveMQ
if(this.session.IsTransacted)
{
- this.AckLater(dispatch, AckType.DeliveredAck);
+ if (this.transactedIndividualAck)
+ {
+ ImmediateIndividualTransactedAck(dispatch);
+ }
+ else
+ {
+ this.AckLater(dispatch, AckType.DeliveredAck);
+ }
}
}
}
+ private bool IsOptimizedAckTime()
+ {
+ // evaluate both expired and normal msgs as otherwise consumer may get stalled
+ if (ackCounter + deliveredCounter >= (this.info.PrefetchSize * .65))
+ {
+ return true;
+ }
+
+ if (optimizeAcknowledgeTimeOut > 0)
+ {
+ DateTime deadline = optimizeAckTimestamp +
+ TimeSpan.FromMilliseconds(optimizeAcknowledgeTimeOut);
+
+ if (DateTime.Now >= deadline)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
{
if(this.unconsumedMessages.Closed)
@@ -845,7 +1084,7 @@ namespace Apache.NMS.ActiveMQ
this.dispatchedMessages.Remove(dispatch);
}
- AckLater(dispatch, AckType.DeliveredAck);
+ Acknowledge(dispatch, AckType.DeliveredAck);
}
else
{
@@ -861,12 +1100,42 @@ namespace Apache.NMS.ActiveMQ
{
if(this.dispatchedMessages.Count != 0)
{
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
- if(ack != null)
+ if (this.optimizeAcknowledge)
{
- this.dispatchedMessages.Clear();
- this.session.SendAck(ack);
- }
+ this.ackCounter++;
+
+ if (IsOptimizedAckTime())
+ {
+ MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+ if (ack != null)
+ {
+ this.dispatchedMessages.Clear();
+ this.ackCounter = 0;
+ this.session.SendAck(ack);
+ this.optimizeAckTimestamp = DateTime.Now;
+ }
+ // as further optimization send ack for expired msgs wehn
+ // there are any. This resets the deliveredCounter to 0 so
+ // that we won't sent standard acks with every msg just
+ // because the deliveredCounter just below 0.5 * prefetch
+ // as used in ackLater()
+ if (this.pendingAck != null && this.deliveredCounter > 0)
+ {
+ this.session.SendAck(pendingAck);
+ this.pendingAck = null;
+ this.deliveredCounter = 0;
+ }
+ }
+ }
+ else
+ {
+ MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+ if (ack != null)
+ {
+ this.dispatchedMessages.Clear();
+ this.session.SendAck(ack);
+ }
+ }
}
}
this.deliveringAcks.Value = false;
@@ -907,15 +1176,8 @@ namespace Apache.NMS.ActiveMQ
}
MessageDispatch dispatch = this.dispatchedMessages.First.Value;
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) type;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = dispatch.Destination;
- ack.LastMessageId = dispatch.Message.MessageId;
- ack.MessageCount = this.dispatchedMessages.Count;
+ MessageAck ack = new MessageAck(dispatch, (byte) type, this.dispatchedMessages.Count);
ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
return ack;
}
}
@@ -926,27 +1188,14 @@ namespace Apache.NMS.ActiveMQ
// consumer got the message to expand the pre-fetch window
if(this.session.IsTransacted)
{
- this.session.DoStartTransaction();
-
- if(!synchronizationRegistered)
- {
- Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
- this.info.ConsumerId);
- this.synchronizationRegistered = true;
- this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
- }
+ RegisterSync();
}
this.deliveredCounter++;
MessageAck oldPendingAck = pendingAck;
- pendingAck = new MessageAck();
- pendingAck.AckType = (byte) type;
- pendingAck.ConsumerId = this.info.ConsumerId;
- pendingAck.Destination = dispatch.Destination;
- pendingAck.LastMessageId = dispatch.Message.MessageId;
- pendingAck.MessageCount = deliveredCounter;
+ pendingAck = new MessageAck(dispatch, (byte) type, deliveredCounter);
if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
{
@@ -972,7 +1221,7 @@ namespace Apache.NMS.ActiveMQ
Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
}
- this.session.Connection.Oneway(oldPendingAck);
+ this.session.SendAck(oldPendingAck);
}
else
{
@@ -983,17 +1232,59 @@ namespace Apache.NMS.ActiveMQ
}
}
- if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
+ // evaluate both expired and normal msgs as otherwise consumer may get stalled
+ if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
{
- this.session.Connection.Oneway(pendingAck);
+ this.session.SendAck(pendingAck);
this.pendingAck = null;
this.deliveredCounter = 0;
this.additionalWindowSize = 0;
}
}
+ private void ImmediateIndividualTransactedAck(MessageDispatch dispatch)
+ {
+ // acks accumulate on the broker pending transaction completion to indicate
+ // delivery status
+ RegisterSync();
+ MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+ ack.TransactionId = session.TransactionContext.TransactionId;
+ this.session.Connection.SyncRequest(ack);
+ }
+
+ private void RegisterSync()
+ {
+ // Don't acknowledge now, but we may need to let the broker know the
+ // consumer got the message to expand the pre-fetch window
+ if(this.session.IsTransacted)
+ {
+ this.session.DoStartTransaction();
+
+ if(!synchronizationRegistered)
+ {
+ Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
+ this.info.ConsumerId);
+ this.synchronizationRegistered = true;
+ this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
+ }
+ }
+ }
+
+ private void Acknowledge(MessageDispatch dispatch, AckType ackType)
+ {
+ MessageAck ack = new MessageAck(dispatch, (byte) ackType, 1);
+ this.session.SendAck(ack);
+ lock(this.dispatchedMessages)
+ {
+ dispatchedMessages.Remove(dispatch);
+ }
+ }
+
internal void Acknowledge()
{
+ ClearDispatchList();
+ WaitForRedeliveries();
+
lock(this.dispatchedMessages)
{
// Acknowledge all messages so far.
@@ -1006,6 +1297,7 @@ namespace Apache.NMS.ActiveMQ
if(this.session.IsTransacted)
{
+ RollbackOnFailedRecoveryRedelivery();
if (!this.session.TransactionContext.InTransaction)
{
this.session.DoStartTransaction();
@@ -1032,6 +1324,7 @@ namespace Apache.NMS.ActiveMQ
lock(this.dispatchedMessages)
{
this.dispatchedMessages.Clear();
+ ClearPreviouslyDelivered();
}
this.redeliveryDelay = 0;
@@ -1041,8 +1334,26 @@ namespace Apache.NMS.ActiveMQ
{
lock(this.unconsumedMessages.SyncRoot)
{
+ if (this.optimizeAcknowledge)
+ {
+ // remove messages read but not acked at the broker yet through optimizeAcknowledge
+ if (!this.info.Browser)
+ {
+ lock(this.dispatchedMessages)
+ {
+ for (int i = 0; (i < this.dispatchedMessages.Count) && (i < ackCounter); i++)
+ {
+ // ensure we don't filter this as a duplicate
+ MessageDispatch dispatch = this.dispatchedMessages.Last.Value;
+ this.dispatchedMessages.RemoveLast();
+ session.Connection.RollbackDuplicate(this, dispatch.Message);
+ }
+ }
+ }
+ }
lock(this.dispatchedMessages)
{
+ RollbackPreviouslyDeliveredAndNotRedelivered();
if(this.dispatchedMessages.Count == 0)
{
Tracer.DebugFormat("Consumer {0} Rolled Back, no dispatched Messages",
@@ -1062,20 +1373,24 @@ namespace Apache.NMS.ActiveMQ
{
// Allow the message to update its internal to reflect a Rollback.
dispatch.Message.OnMessageRollback();
+ // ensure we don't filter this as a duplicate
+ session.Connection.RollbackDuplicate(this, dispatch.Message);
}
if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
{
// We need to NACK the messages so that they get sent to the DLQ.
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) AckType.PoisonAck;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = lastMd.Destination;
- ack.LastMessageId = lastMd.Message.MessageId;
- ack.MessageCount = this.dispatchedMessages.Count;
- ack.FirstMessageId = firstMsgId;
+ MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, dispatchedMessages.Count);
+
+ if (lastMd.RollbackCause != null)
+ {
+ BrokerError cause = new BrokerError();
+ cause.ExceptionClass = "javax.jms.JMSException";
+ cause.Message = lastMd.RollbackCause.Message;
+ ack.PoisonCause = cause;
+ }
+ ack.FirstMessageId = firstMsgId;
this.session.SendAck(ack);
@@ -1083,21 +1398,16 @@ namespace Apache.NMS.ActiveMQ
additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
this.redeliveryDelay = 0;
+ this.deliveredCounter -= this.dispatchedMessages.Count;
+ this.dispatchedMessages.Clear();
}
else
{
// We only send a RedeliveryAck after the first redelivery
if(currentRedeliveryCount > 0)
{
- MessageAck ack = new MessageAck();
-
- ack.AckType = (byte) AckType.RedeliveredAck;
- ack.ConsumerId = this.info.ConsumerId;
- ack.Destination = lastMd.Destination;
- ack.LastMessageId = lastMd.Message.MessageId;
- ack.MessageCount = this.dispatchedMessages.Count;
+ MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, dispatchedMessages.Count);
ack.FirstMessageId = firstMsgId;
-
this.session.SendAck(ack);
}
@@ -1115,6 +1425,9 @@ namespace Apache.NMS.ActiveMQ
this.unconsumedMessages.EnqueueFirst(dispatch);
}
+ this.deliveredCounter -= this.dispatchedMessages.Count;
+ this.dispatchedMessages.Clear();
+
if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
{
DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
@@ -1125,9 +1438,6 @@ namespace Apache.NMS.ActiveMQ
Start();
}
}
-
- this.deliveredCounter -= this.dispatchedMessages.Count;
- this.dispatchedMessages.Clear();
}
}
@@ -1237,6 +1547,144 @@ namespace Apache.NMS.ActiveMQ
return this.info.Destination.Equals(dest);
}
+ private void DoOptimizedAck(object state)
+ {
+ DeliverAcks();
+ }
+
+ private void WaitForRedeliveries()
+ {
+ if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null)
+ {
+ DateTime expiry = DateTime.Now + TimeSpan.FromMilliseconds(failoverRedeliveryWaitPeriod);
+ int numberNotReplayed;
+ do
+ {
+ numberNotReplayed = 0;
+ lock(this.dispatchedMessages)
+ {
+ if (previouslyDeliveredMessages != null)
+ {
+ foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+ {
+ if (!entry.Value)
+ {
+ numberNotReplayed++;
+ }
+ }
+ }
+ }
+ if (numberNotReplayed > 0)
+ {
+ Tracer.Info("waiting for redelivery of " + numberNotReplayed + " in transaction: " +
+ previouslyDeliveredMessages.TransactionId + ", to consumer :" +
+ this.info.ConsumerId);
+ Thread.Sleep((int) Math.Max(500, failoverRedeliveryWaitPeriod/4));
+ }
+ }
+ while (numberNotReplayed > 0 && expiry < DateTime.Now);
+ }
+ }
+
+ // called with deliveredMessages locked
+ private void RollbackOnFailedRecoveryRedelivery()
+ {
+ if (previouslyDeliveredMessages != null)
+ {
+ // if any previously delivered messages was not re-delivered, transaction is
+ // invalid and must rollback as messages have been dispatched else where.
+ int numberNotReplayed = 0;
+ foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+ {
+ if (!entry.Value)
+ {
+ numberNotReplayed++;
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("previously delivered message has not been replayed in transaction: " +
+ previouslyDeliveredMessages.TransactionId + " , messageId: " + entry.Key);
+ }
+ }
+ }
+
+ if (numberNotReplayed > 0)
+ {
+ String message = "rolling back transaction (" +
+ previouslyDeliveredMessages.TransactionId + ") post failover recovery. " + numberNotReplayed +
+ " previously delivered message(s) not replayed to consumer: " + this.info.ConsumerId;
+ Tracer.Warn(message);
+ throw new TransactionRolledBackException(message);
+ }
+ }
+ }
+
+ // called with unconsumedMessages && dispatchedMessages locked
+ // remove any message not re-delivered as they can't be replayed to this
+ // consumer on rollback
+ private void RollbackPreviouslyDeliveredAndNotRedelivered()
+ {
+ if (previouslyDeliveredMessages != null)
+ {
+ foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+ {
+ if (!entry.Value)
+ {
+ RemoveFromDeliveredMessages(entry.Key);
+ }
+ }
+
+ ClearPreviouslyDelivered();
+ }
+ }
+
+ // Must be called with dispatchedMessages locked
+ private void RemoveFromDeliveredMessages(MessageId key)
+ {
+ MessageDispatch toRemove = null;
+ foreach(MessageDispatch candidate in this.dispatchedMessages)
+ {
+ if (candidate.Message.MessageId.Equals(key))
+ {
+ session.Connection.RollbackDuplicate(this, candidate.Message);
+ toRemove = candidate;
+ break;
+ }
+ }
+
+ if (toRemove != null)
+ {
+ this.dispatchedMessages.Remove(toRemove);
+ }
+ }
+
+ // called with deliveredMessages locked
+ private void ClearPreviouslyDelivered()
+ {
+ if (previouslyDeliveredMessages != null)
+ {
+ previouslyDeliveredMessages.Clear();
+ previouslyDeliveredMessages = null;
+ }
+ }
+
+ #region Transaction Redelivery Tracker
+
+ class PreviouslyDeliveredMap : Dictionary<MessageId, bool>
+ {
+ private TransactionId transactionId;
+ public TransactionId TransactionId
+ {
+ get { return this.transactionId; }
+ }
+
+ public PreviouslyDeliveredMap(TransactionId transactionId) : base()
+ {
+ this.transactionId = transactionId;
+ }
+ }
+
+ #endregion
+
#region Nested ISyncronization Types
class MessageConsumerSynchronization : ISynchronization
@@ -1252,7 +1700,21 @@ namespace Apache.NMS.ActiveMQ
{
Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd Called for Consumer {0}.",
this.consumer.ConsumerId);
- this.consumer.Acknowledge();
+
+ if (this.consumer.TransactedIndividualAck)
+ {
+ this.consumer.ClearDispatchList();
+ this.consumer.WaitForRedeliveries();
+ lock(this.consumer.dispatchedMessages)
+ {
+ this.consumer.RollbackOnFailedRecoveryRedelivery();
+ }
+ }
+ else
+ {
+ this.consumer.Acknowledge();
+ }
+
this.consumer.synchronizationRegistered = false;
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1470081&r1=1470080&r2=1470081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr 19 22:44:28 2013
@@ -966,6 +966,11 @@ namespace Apache.NMS.ActiveMQ
internal void SendAck(MessageAck ack, bool lazy)
{
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Session sending Ack: " + ack);
+ }
+
if(lazy || connection.SendAcksAsync || this.IsTransacted )
{
this.connection.Oneway(ack);
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs?rev=1470081&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs Fri Apr 19 22:44:28 2013
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ [TestFixture]
+ public class OptimizedAckTest : NMSTestSupport
+ {
+ private Connection connection;
+ private int counter;
+
+ [SetUp]
+ public override void SetUp()
+ {
+ connection = (Connection) CreateConnection();
+ connection.OptimizeAcknowledge = true;
+ connection.OptimizeAcknowledgeTimeOut = 0;
+ connection.PrefetchPolicy.All = 100;
+
+ counter = 0;
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ if(this.connection != null)
+ {
+ this.connection.Close();
+ this.connection = null;
+ }
+
+ base.TearDown();
+ }
+
+ [Test]
+ public void TestOptimizedAckWithExpiredMsgs()
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+ ITextMessage message;
+
+ // Produce msgs that will expire quickly
+ for (int i = 0; i < 45; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(200));
+ }
+
+ // Produce msgs that don't expire
+ for (int i=0; i < 60; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(60000));
+ }
+
+ Thread.Sleep(1000); // let the batch of 45 expire.
+
+ consumer.Listener += OnMessage;
+ connection.Start();
+
+ for (int i = 0; i < 60; ++i)
+ {
+ if (counter == 60)
+ {
+ break;
+ }
+ Thread.Sleep(1000);
+ }
+
+ Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+
+ // Cleanup
+ producer.Close();
+ consumer.Close();
+ session.Close();
+ connection.Close();
+ }
+
+ [Test]
+ public void TestOptimizedAckWithExpiredMsgsSync()
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+ ITextMessage message;
+
+ // Produce msgs that will expire quickly
+ for (int i = 0; i < 45; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(200));
+ }
+
+ // Produce msgs that don't expire
+ for (int i=0; i < 60; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(60000));
+ }
+
+ Thread.Sleep(1000);
+ connection.Start();
+
+ int counter = 0;
+ for (; counter < 60; ++counter)
+ {
+ Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+ }
+
+ Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(2000)));
+
+ // Cleanup
+ producer.Close();
+ consumer.Close();
+ session.Close();
+ connection.Close();
+ }
+
+ [Test]
+ public void testOptimizedAckWithExpiredMsgsSync2()
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+ ITextMessage message;
+
+ // Produce msgs that don't expire
+ for (int i = 0; i < 56; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(60000));
+ }
+
+ // Produce msgs that will expire quickly
+ for (int i=0; i<44; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(200));
+ }
+
+ // Produce some moremsgs that don't expire
+ for (int i=0; i<4; i++)
+ {
+ message = session.CreateTextMessage();
+ producer.Send(message,
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(60000));
+ }
+
+ Thread.Sleep(1000);
+ connection.Start();
+
+ int counter = 0;
+ for (; counter < 60; ++counter)
+ {
+ Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+ }
+
+ Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(2000)));
+
+ // Cleanup
+ producer.Close();
+ consumer.Close();
+ session.Close();
+ connection.Close();
+ }
+
+ private void OnMessage(IMessage msg)
+ {
+ counter++;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs
------------------------------------------------------------------------------
svn:eol-style = native