You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/28 17:54:35 UTC
svn commit: r1621155 [3/4] - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x: ./
src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Util/
src/test/csharp/Transport/failover/
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs?rev=1621155&r1=1621154&r2=1621155&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/MessageConsumer.cs Thu Aug 28 15:54:34 2014
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
using System.Threading;
using System.Collections.Generic;
@@ -25,69 +26,71 @@ using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ
{
- public enum AckType
- {
- DeliveredAck = 0, // Message delivered but not consumed
- PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
- ConsumedAck = 2, // Message consumed, discard
- RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
- IndividualAck = 4 // Only the given message is to be treated as consumed.
- }
-
- /// <summary>
- /// An object capable of receiving messages from some destination
- /// </summary>
- public class MessageConsumer : IMessageConsumer, IDispatcher
- {
+ public enum AckType
+ {
+ DeliveredAck = 0, // Message delivered but not consumed
+ PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
+ ConsumedAck = 2, // Message consumed, discard
+ RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
+ IndividualAck = 4, // Only the given message is to be treated as consumed.
+ UnmatchedAck = 5, // Case where durable topic subscription does not match selector
+ ExpiredAck = 6 // Case where message has expired before being dispatched to a consumer.
+ }
+
+ /// <summary>
+ /// An object capable of receiving messages from some destination
+ /// </summary>
+ public class MessageConsumer : IMessageConsumer, IDispatcher
+ {
private const int NO_MAXIMUM_REDELIVERIES = -1;
private readonly MessageTransformation messageTransformation;
private readonly MessageDispatchChannel unconsumedMessages;
- private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+ private readonly LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
private readonly ConsumerInfo info;
private readonly Session session;
- private MessageAck pendingAck = null;
+ private MessageAck pendingAck = null;
- private readonly Atomic<bool> started = new Atomic<bool>();
- private readonly Atomic<bool> deliveringAcks = new Atomic<bool>();
+ private readonly Atomic<bool> started = new Atomic<bool>();
+ private readonly Atomic<bool> deliveringAcks = new Atomic<bool>();
- private int redeliveryTimeout = 500;
- protected bool disposed = false;
- private long lastDeliveredSequenceId = 0;
- private int ackCounter = 0;
- private int deliveredCounter = 0;
- private int additionalWindowSize = 0;
- private long redeliveryDelay = 0;
- private int dispatchedCount = 0;
- private volatile bool synchronizationRegistered = false;
- private bool clearDispatchList = false;
- private bool inProgressClearRequiredFlag;
- private bool optimizeAcknowledge;
- private DateTime optimizeAckTimestamp = DateTime.Now;
- private long optimizeAcknowledgeTimeOut = 0;
- private long optimizedAckScheduledAckInterval = 0;
- private WaitCallback optimizedAckTask = null;
- private long failoverRedeliveryWaitPeriod = 0;
- private bool transactedIndividualAck = false;
- private bool nonBlockingRedelivery = false;
+ private int redeliveryTimeout = 500;
+ protected bool disposed = false;
+ private long lastDeliveredSequenceId = 0;
+ private int ackCounter = 0;
+ private int deliveredCounter = 0;
+ private int additionalWindowSize = 0;
+ private long redeliveryDelay = 0;
+ private int dispatchedCount = 0;
+ private volatile bool synchronizationRegistered = false;
+ private bool clearDeliveredList = false;
+ private bool inProgressClearRequiredFlag;
+ private bool optimizeAcknowledge;
+ private DateTime optimizeAckTimestamp = DateTime.Now;
+ private long optimizeAcknowledgeTimeOut = 0;
+ private long optimizedAckScheduledAckInterval = 0;
+ private WaitCallback optimizedAckTask = null;
+ private long failoverRedeliveryWaitPeriod = 0;
+ private bool transactedIndividualAck = false;
+ private bool nonBlockingRedelivery = false;
private Exception failureError;
- private ThreadPoolExecutor executor;
+ private ThreadPoolExecutor executor;
- private event MessageListener listener;
+ private event MessageListener listener;
- private IRedeliveryPolicy redeliveryPolicy;
- private PreviouslyDeliveredMap previouslyDeliveredMessages;
+ private IRedeliveryPolicy redeliveryPolicy;
+ private PreviouslyDeliveredMap previouslyDeliveredMessages;
- // Constructor internal to prevent clients from creating an instance.
- internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
- String name, String selector, int prefetch, int maxPendingMessageCount,
- bool noLocal, bool browser, bool dispatchAsync )
- {
- if(destination == null)
- {
- throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ // Constructor internal to prevent clients from creating an instance.
+ internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
+ String name, String selector, int prefetch, int maxPendingMessageCount,
+ bool noLocal, bool browser, bool dispatchAsync )
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
}
else if(destination.PhysicalName == null)
{
@@ -101,126 +104,126 @@ namespace Apache.NMS.ActiveMQ
{
throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination);
}
-
+
String connectionID = session.Connection.ConnectionId.Value;
if(physicalName.IndexOf(connectionID) < 0)
{
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}
-
+
if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
{
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
}
- this.session = session;
- this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
- this.messageTransformation = this.session.Connection.MessageTransformation;
-
- if(session.Connection.MessagePrioritySupported)
- {
- this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
- }
- else
- {
- this.unconsumedMessages = new FifoMessageDispatchChannel();
- }
-
- this.info = new ConsumerInfo();
- this.info.ConsumerId = id;
- this.info.Destination = destination;
- this.info.SubscriptionName = name;
- this.info.Selector = selector;
- this.info.PrefetchSize = prefetch;
- this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
- this.info.NoLocal = noLocal;
- this.info.Browser = browser;
- this.info.DispatchAsync = dispatchAsync;
- this.info.Retroactive = session.Retroactive;
- this.info.Exclusive = session.Exclusive;
- this.info.Priority = session.Priority;
- this.info.ClientId = session.Connection.ClientId;
-
- // If the destination contained a URI query, then use it to set public properties
- // on the ConsumerInfo
- if(destination.Options != null)
- {
- // Get options prefixed with "consumer.*"
- StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
- // Extract out custom extension options "consumer.nms.*"
- StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
-
- URISupport.SetProperties(this.info, options);
- URISupport.SetProperties(this, customConsumerOptions, "nms.");
- }
-
- this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge &&
- session.IsAutoAcknowledge && !this.info.Browser;
-
- if (this.optimizeAcknowledge) {
- this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
- OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
- }
-
- this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
- this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
- this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
- this.transactedIndividualAck = session.Connection.TransactedIndividualAck || this.nonBlockingRedelivery;
- }
-
- ~MessageConsumer()
- {
- Dispose(false);
- }
-
- #region Property Accessors
-
- public long LastDeliveredSequenceId
- {
- get { return this.lastDeliveredSequenceId; }
- }
-
- public ConsumerId ConsumerId
- {
- get { return this.info.ConsumerId; }
- }
-
- public ConsumerInfo ConsumerInfo
- {
- get { return this.info; }
- }
-
- public int RedeliveryTimeout
- {
- get { return redeliveryTimeout; }
- set { redeliveryTimeout = value; }
- }
-
- public int PrefetchSize
- {
- get { return this.info.PrefetchSize; }
- }
-
- public IRedeliveryPolicy RedeliveryPolicy
- {
- get { return this.redeliveryPolicy; }
- set { this.redeliveryPolicy = value; }
- }
-
- public long UnconsumedMessageCount
- {
- get { return this.unconsumedMessages.Count; }
- }
-
- // Custom Options
- private bool ignoreExpiration = false;
- public bool IgnoreExpiration
- {
- get { return ignoreExpiration; }
- set { ignoreExpiration = value; }
- }
+ this.session = session;
+ this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+ this.messageTransformation = this.session.Connection.MessageTransformation;
+
+ if(session.Connection.MessagePrioritySupported)
+ {
+ this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
+ }
+ else
+ {
+ this.unconsumedMessages = new FifoMessageDispatchChannel();
+ }
+
+ this.info = new ConsumerInfo();
+ this.info.ConsumerId = id;
+ this.info.Destination = destination;
+ this.info.SubscriptionName = name;
+ this.info.Selector = selector;
+ this.info.PrefetchSize = prefetch;
+ this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
+ this.info.NoLocal = noLocal;
+ this.info.Browser = browser;
+ this.info.DispatchAsync = dispatchAsync;
+ this.info.Retroactive = session.Retroactive;
+ this.info.Exclusive = session.Exclusive;
+ this.info.Priority = session.Priority;
+ this.info.ClientId = session.Connection.ClientId;
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ if(destination.Options != null)
+ {
+ // Get options prefixed with "consumer.*"
+ StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+ // Extract out custom extension options "consumer.nms.*"
+ StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+ URISupport.SetProperties(this.info, options);
+ URISupport.SetProperties(this, customConsumerOptions, "nms.");
+ }
+
+ this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge &&
+ session.IsAutoAcknowledge && !this.info.Browser;
+
+ if (this.optimizeAcknowledge) {
+ this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
+ OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
+ }
+
+ this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
+ this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
+ this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
+ this.transactedIndividualAck = session.Connection.TransactedIndividualAck || this.nonBlockingRedelivery;
+ }
+
+ ~MessageConsumer()
+ {
+ Dispose(false);
+ }
+
+ #region Property Accessors
+
+ public long LastDeliveredSequenceId
+ {
+ get { return this.lastDeliveredSequenceId; }
+ }
+
+ public ConsumerId ConsumerId
+ {
+ get { return this.info.ConsumerId; }
+ }
+
+ public ConsumerInfo ConsumerInfo
+ {
+ get { return this.info; }
+ }
+
+ public int RedeliveryTimeout
+ {
+ get { return redeliveryTimeout; }
+ set { redeliveryTimeout = value; }
+ }
+
+ public int PrefetchSize
+ {
+ get { return this.info.PrefetchSize; }
+ }
+
+ public IRedeliveryPolicy RedeliveryPolicy
+ {
+ get { return this.redeliveryPolicy; }
+ set { this.redeliveryPolicy = value; }
+ }
+
+ public long UnconsumedMessageCount
+ {
+ get { return this.unconsumedMessages.Count; }
+ }
+
+ // Custom Options
+ private bool ignoreExpiration = false;
+ public bool IgnoreExpiration
+ {
+ get { return ignoreExpiration; }
+ set { ignoreExpiration = value; }
+ }
public Exception FailureError
{
@@ -228,1657 +231,1674 @@ namespace Apache.NMS.ActiveMQ
set { this.failureError = value; }
}
- public bool OptimizeAcknowledge
- {
- get { return this.optimizeAcknowledge; }
- set
- {
- if (optimizeAcknowledge && !value)
- {
- DeliverAcks();
- }
- this.optimizeAcknowledge = value;
- }
- }
-
- public long OptimizeAcknowledgeTimeOut
- {
- get { return this.optimizeAcknowledgeTimeOut; }
- set { this.optimizeAcknowledgeTimeOut = value; }
- }
-
- public long OptimizedAckScheduledAckInterval
- {
- get { return this.optimizedAckScheduledAckInterval; }
- set
- {
- this.optimizedAckScheduledAckInterval = value;
-
- if (this.optimizedAckTask != null)
- {
- this.session.Scheduler.Cancel(this.optimizedAckTask);
- this.optimizedAckTask = null;
- }
-
- // Should we periodically send out all outstanding acks.
- if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0)
- {
- this.optimizedAckTask = new WaitCallback(DoOptimizedAck);
- this.session.Scheduler.ExecutePeriodically(
- optimizedAckTask, null, TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
- }
- }
- }
-
- public long FailoverRedeliveryWaitPeriod
- {
- get { return this.failoverRedeliveryWaitPeriod; }
- set { this.failoverRedeliveryWaitPeriod = value; }
- }
-
- public bool TransactedIndividualAck
- {
- get { return this.transactedIndividualAck; }
- set { this.transactedIndividualAck = value; }
- }
-
- public bool NonBlockingRedelivery
- {
- get { return this.nonBlockingRedelivery; }
- set { this.nonBlockingRedelivery = value; }
- }
-
- #endregion
-
- #region IMessageConsumer Members
-
- private ConsumerTransformerDelegate consumerTransformer;
- /// <summary>
- /// A Delegate that is called each time a Message is dispatched to allow the client to do
- /// any necessary transformations on the received message before it is delivered.
- /// </summary>
- public ConsumerTransformerDelegate ConsumerTransformer
- {
- get { return this.consumerTransformer; }
- set { this.consumerTransformer = value; }
- }
-
- public event MessageListener Listener
- {
- add
- {
- CheckClosed();
-
- if(this.PrefetchSize == 0)
- {
- throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
- }
-
- bool wasStarted = this.session.Started;
-
- if(wasStarted)
- {
- this.session.Stop();
- }
-
- listener += value;
- this.session.Redispatch(this.unconsumedMessages);
-
- if(wasStarted)
- {
- this.session.Start();
- }
- }
- remove { listener -= value; }
- }
-
- public IMessage Receive()
- {
- CheckClosed();
- CheckMessageListener();
-
- SendPullRequest(0);
- MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-
- if(dispatch == null)
- {
- return null;
- }
-
- BeforeMessageIsConsumed(dispatch);
- AfterMessageIsConsumed(dispatch, false);
-
- return CreateActiveMQMessage(dispatch);
- }
-
- public IMessage Receive(TimeSpan timeout)
- {
- CheckClosed();
- CheckMessageListener();
-
- MessageDispatch dispatch = null;
- SendPullRequest((long) timeout.TotalMilliseconds);
-
- if(this.PrefetchSize == 0)
- {
- dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
- }
- else
- {
- dispatch = this.Dequeue(timeout);
- }
-
- if(dispatch == null)
- {
- return null;
- }
-
- BeforeMessageIsConsumed(dispatch);
- AfterMessageIsConsumed(dispatch, false);
-
- return CreateActiveMQMessage(dispatch);
- }
-
- public IMessage ReceiveNoWait()
- {
- CheckClosed();
- CheckMessageListener();
-
- MessageDispatch dispatch = null;
- SendPullRequest(-1);
-
- if(this.PrefetchSize == 0)
- {
- dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
- }
- else
- {
- dispatch = this.Dequeue(TimeSpan.Zero);
- }
-
- if(dispatch == null)
- {
- return null;
- }
-
- BeforeMessageIsConsumed(dispatch);
- AfterMessageIsConsumed(dispatch, false);
-
- return CreateActiveMQMessage(dispatch);
- }
-
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
-
- protected void Dispose(bool disposing)
- {
- if(disposed)
- {
- return;
- }
-
- try
- {
- Close();
- }
- catch
- {
- // Ignore network errors.
- }
-
- disposed = true;
- }
-
- public virtual void Close()
- {
- if(!this.unconsumedMessages.Closed)
- {
- if(this.dispatchedMessages.Count != 0 && this.session.IsTransacted && this.session.TransactionContext.InTransaction)
- {
- Tracer.DebugFormat("Consumer {0} Registering new ConsumerCloseSynchronization",
- this.info.ConsumerId);
- this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
- }
- else
- {
- Tracer.DebugFormat("Consumer {0} No Active TX or pending acks, closing normally.",
- this.info.ConsumerId);
- this.DoClose();
- }
- }
- }
-
- internal void DoClose()
- {
- Shutdown();
- RemoveInfo removeCommand = new RemoveInfo();
- removeCommand.ObjectId = this.ConsumerId;
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Remove of Consumer[{0}] of destination [{1}] sent last delivered Id[{2}].",
- this.ConsumerId, this.info.Destination, this.lastDeliveredSequenceId);
- }
- removeCommand.LastDeliveredSequenceId = lastDeliveredSequenceId;
- this.session.Connection.Oneway(removeCommand);
- }
-
- /// <summary>
- /// Called from the parent Session of this Consumer to indicate that its
- /// parent session is closing and this Consumer should close down but not
- /// send any message to the Broker as the parent close will take care of
- /// removing its child resources at the broker.
- /// </summary>
- internal void Shutdown()
- {
- if(!this.unconsumedMessages.Closed)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
- }
-
- // Do we have any acks we need to send out before closing?
- // Ack any delivered messages now.
- if(!this.session.IsTransacted)
- {
- DeliverAcks();
- if(this.IsAutoAcknowledgeBatch)
- {
- Acknowledge();
- }
- }
-
- if (this.executor != null)
- {
- this.executor.Shutdown();
- this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
- this.executor = null;
- }
- if (this.optimizedAckTask != null)
- {
- this.session.Scheduler.Cancel(this.optimizedAckTask);
- }
-
- if (this.session.IsClientAcknowledge)
- {
- if (!this.info.Browser)
- {
- // rollback duplicates that aren't acknowledged
- LinkedList<MessageDispatch> temp = null;
- lock(this.dispatchedMessages)
- {
- temp = new LinkedList<MessageDispatch>(this.dispatchedMessages);
- }
- foreach (MessageDispatch old in temp)
- {
- this.session.Connection.RollbackDuplicate(this, old.Message);
- }
- temp.Clear();
- }
- }
-
- if(!this.session.IsTransacted)
- {
- lock(this.dispatchedMessages)
- {
- dispatchedMessages.Clear();
- }
- }
-
- this.session.RemoveConsumer(this);
- this.unconsumedMessages.Close();
-
- MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
- if (!this.info.Browser)
- {
- foreach (MessageDispatch old in unconsumed)
- {
- // ensure we don't filter this as a duplicate
- session.Connection.RollbackDuplicate(this, old.Message);
- }
- }
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
- }
- }
- }
-
- #endregion
-
- protected void SendPullRequest(long timeout)
- {
- if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty)
- {
- MessagePull messagePull = new MessagePull();
- messagePull.ConsumerId = this.info.ConsumerId;
- messagePull.Destination = this.info.Destination;
- messagePull.Timeout = timeout;
- messagePull.ResponseRequired = false;
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Sending MessagePull: " + messagePull);
- }
-
- session.Connection.Oneway(messagePull);
- }
- }
-
- protected void DoIndividualAcknowledge(ActiveMQMessage message)
- {
- MessageDispatch dispatch = null;
-
- lock(this.dispatchedMessages)
- {
- foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
- {
- if(originalDispatch.Message.MessageId.Equals(message.MessageId))
- {
- dispatch = originalDispatch;
- this.dispatchedMessages.Remove(originalDispatch);
- break;
- }
- }
- }
-
- if(dispatch == null)
- {
- Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
- return;
- }
-
- MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
- Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
- this.session.SendAck(ack);
- }
-
- protected void DoNothingAcknowledge(ActiveMQMessage message)
- {
- }
-
- protected void DoClientAcknowledge(ActiveMQMessage message)
- {
- this.CheckClosed();
- Tracer.Debug("Sending Client Ack:");
- this.session.Acknowledge();
- }
-
- public void Start()
- {
- if(this.unconsumedMessages.Closed)
- {
- return;
- }
-
- this.started.Value = true;
- this.unconsumedMessages.Start();
- this.session.Executor.Wakeup();
- }
-
- public void Stop()
- {
- this.started.Value = false;
- this.unconsumedMessages.Stop();
- }
-
- public void DeliverAcks()
- {
- MessageAck ack = null;
-
- if(this.deliveringAcks.CompareAndSet(false, true))
- {
- if(this.IsAutoAcknowledgeEach)
- {
- lock(this.dispatchedMessages)
- {
- ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
- if(ack != null)
- {
- Tracer.Debug("Consumer - DeliverAcks clearing the Dispatch list");
- this.dispatchedMessages.Clear();
- this.ackCounter = 0;
- }
- else
- {
- ack = this.pendingAck;
- this.pendingAck = null;
- }
- }
- }
- else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
- {
- ack = pendingAck;
- pendingAck = null;
- }
-
- if(ack != null)
- {
- if (this.executor == null)
- {
- this.executor = new ThreadPoolExecutor();
- }
-
- this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
- }
- else
- {
- this.deliveringAcks.Value = false;
- }
- }
- }
-
- private void AsyncDeliverAck(object ack)
- {
- MessageAck pending = ack as MessageAck;
- try
- {
- this.session.SendAck(pending, true);
- }
- catch
- {
- Tracer.ErrorFormat("Consumer {0} Failed to deliver async Ack {1}",
- this.info.ConsumerId, pending);
- }
- finally
- {
- this.deliveringAcks.Value = false;
- }
- }
-
- internal void InProgressClearRequired()
- {
- inProgressClearRequiredFlag = true;
- // deal with delivered messages async to avoid lock contention with in progress acks
- clearDispatchList = true;
- }
-
- internal void ClearMessagesInProgress()
- {
- if(inProgressClearRequiredFlag)
- {
- // Called from a thread in the ThreadPool, so we wait until we can
- // get a lock on the unconsumed list then we clear it.
- lock(this.unconsumedMessages.SyncRoot)
- {
- if(inProgressClearRequiredFlag)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
- this.unconsumedMessages.Count + ") on transport interrupt");
- }
-
- // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
- MessageDispatch[] list = this.unconsumedMessages.RemoveAll();
- if (!this.info.Browser)
- {
- foreach (MessageDispatch old in list)
- {
- session.Connection.RollbackDuplicate(this, old.Message);
- }
- }
-
- // allow dispatch on this connection to resume
- this.session.Connection.TransportInterruptionProcessingComplete();
- this.inProgressClearRequiredFlag = false;
- }
- }
- }
- }
-
- private void ClearDispatchList()
- {
- if (this.clearDispatchList)
- {
- lock(this.dispatchedMessages)
- {
- if (this.clearDispatchList)
- {
- if (dispatchedMessages.Count != 0)
- {
- if (session.IsTransacted)
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
- this.info.ConsumerId, dispatchedMessages.Count);
- }
- if (previouslyDeliveredMessages == null)
- {
- previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
- }
- foreach (MessageDispatch delivered in dispatchedMessages)
- {
- this.previouslyDeliveredMessages[delivered.Message.MessageId] = false;
- }
- }
- else
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
- this.info.ConsumerId, dispatchedMessages.Count);
- }
- this.dispatchedMessages.Clear();
- this.pendingAck = null;
- }
- }
- this.clearDispatchList = false;
- }
- }
- }
- }
-
- public virtual void Dispatch(MessageDispatch dispatch)
- {
- MessageListener listener = this.listener;
- bool dispatchMessage = false;
-
- try
- {
- ClearMessagesInProgress();
- ClearDispatchList();
-
- lock(this.unconsumedMessages.SyncRoot)
- {
- if(!this.unconsumedMessages.Closed)
- {
- if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
- {
- if(listener != null && this.unconsumedMessages.Running)
- {
- if (RedeliveryExceeded(dispatch))
- {
- PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
- return;
- }
- else
- {
- dispatchMessage = true;
- }
- }
- else
- {
- if (!this.unconsumedMessages.Running)
- {
- // delayed redelivery, ensure it can be re delivered
- session.Connection.RollbackDuplicate(this, dispatch.Message);
- }
- this.unconsumedMessages.Enqueue(dispatch);
- }
- }
- else
- {
- if (!this.session.IsTransacted)
- {
- Tracer.Warn("Duplicate dispatch on connection: " + session.Connection.ConnectionId +
- " to consumer: " + ConsumerId + ", ignoring (auto acking) duplicate: " + dispatch);
- MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
- session.SendAck(ack);
- }
- else
- {
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer[{0}]: tracking transacted redelivery of duplicate: {1}",
- this.info.ConsumerId, dispatch.Message);
- }
- bool needsPoisonAck = false;
- lock(this.dispatchedMessages)
- {
- if (previouslyDeliveredMessages != null)
- {
- previouslyDeliveredMessages[dispatch.Message.MessageId] = true;
- }
- else
- {
- // delivery while pending redelivery to another consumer on the same connection
- // not waiting for redelivery will help here
- needsPoisonAck = true;
- }
- }
- if (needsPoisonAck)
- {
- MessageAck poisonAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
- poisonAck.FirstMessageId = dispatch.Message.MessageId;
- BrokerError cause = new BrokerError();
- cause.ExceptionClass = "javax.jms.JMSException";
- cause.Message = "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " +
- session.Connection.ConnectionId;
- Tracer.Warn("Acking duplicate delivery as poison, redelivery must be pending to another" +
- " consumer on this connection, failoverRedeliveryWaitPeriod=" +
- failoverRedeliveryWaitPeriod + ". Message: " + dispatch + ", poisonAck: " + poisonAck);
- this.session.SendAck(poisonAck);
- }
- else
- {
- if (transactedIndividualAck)
- {
- ImmediateIndividualTransactedAck(dispatch);
- }
- else
- {
- this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1));
- }
- }
- }
- }
- }
- }
-
- if(dispatchMessage)
- {
- ActiveMQMessage message = CreateActiveMQMessage(dispatch);
-
- this.BeforeMessageIsConsumed(dispatch);
-
- try
- {
- bool expired = (!IgnoreExpiration && message.IsExpired());
-
- if(!expired)
- {
- listener(message);
- }
-
- this.AfterMessageIsConsumed(dispatch, expired);
- }
- catch(Exception e)
- {
- if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
- {
- // Schedule redelivery and possible dlq processing
- dispatch.RollbackCause = e;
- Rollback();
- }
- else
- {
- // Transacted or Client ack: Deliver the next message.
- this.AfterMessageIsConsumed(dispatch, false);
- }
-
- Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
-
- // If aborted we stop the abort here and let normal processing resume.
- // This allows the session to shutdown normally and ack all messages
- // that have outstanding acks in this consumer.
- if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
- {
- Thread.ResetAbort();
- }
- }
- }
-
- if(++dispatchedCount % 1000 == 0)
- {
- dispatchedCount = 0;
- Thread.Sleep(1);
- }
- }
- catch(Exception e)
- {
- this.session.Connection.OnSessionException(this.session, e);
- }
- }
-
- public bool Iterate()
- {
- if(this.listener != null)
- {
- MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
- if(dispatch != null)
- {
- this.Dispatch(dispatch);
- return true;
- }
- }
-
- return false;
- }
-
- /// <summary>
- /// Used to get an enqueued message from the unconsumedMessages list. The
- /// amount of time this method blocks is based on the timeout value. if
- /// timeout == Timeout.Infinite then it blocks until a message is received.
- /// if timeout == 0 then it it tries to not block at all, it returns a
- /// message if it is available if timeout > 0 then it blocks up to timeout
- /// amount of time. Expired messages will consumed by this method.
- /// </summary>
- /// <param name="timeout">
- /// A <see cref="System.TimeSpan"/>
- /// </param>
- /// <returns>
- /// A <see cref="MessageDispatch"/>
- /// </returns>
- private MessageDispatch Dequeue(TimeSpan timeout)
- {
- DateTime deadline = DateTime.Now;
-
- if(timeout > TimeSpan.Zero)
- {
- deadline += timeout;
- }
-
- while(true)
- {
- MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
-
- // Grab a single date/time for calculations to avoid timing errors.
- DateTime dispatchTime = DateTime.Now;
-
- if(dispatch == null)
- {
- if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
- {
- if(dispatchTime > deadline)
- {
- // Out of time.
- timeout = TimeSpan.Zero;
- }
- else
- {
- // Adjust the timeout to the remaining time.
- timeout = deadline - dispatchTime;
- }
- }
- else
- {
- // Informs the caller of an error in the event that an async exception
- // took down the parent connection.
- if(this.failureError != null)
- {
- throw NMSExceptionSupport.Create(this.failureError);
- }
-
- return null;
- }
- }
- else if(dispatch.Message == null)
- {
- return null;
- }
- else if(!IgnoreExpiration && dispatch.Message.IsExpired())
- {
- Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
-
- BeforeMessageIsConsumed(dispatch);
- AfterMessageIsConsumed(dispatch, true);
- // Refresh the dispatch time
- dispatchTime = DateTime.Now;
-
- if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
- {
- if(dispatchTime > deadline)
- {
- // Out of time.
- timeout = TimeSpan.Zero;
- }
- else
- {
- // Adjust the timeout to the remaining time.
- timeout = deadline - dispatchTime;
- }
- }
- }
- else if (RedeliveryExceeded(dispatch))
+ public bool OptimizeAcknowledge
+ {
+ get { return this.optimizeAcknowledge; }
+ set
+ {
+ if (optimizeAcknowledge && !value)
{
- Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch);
- PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+ DeliverAcks();
}
- else
- {
- return dispatch;
- }
- }
- }
-
- public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
- {
- this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
-
- if (!IsAutoAcknowledgeBatch)
- {
- lock(this.dispatchedMessages)
- {
- this.dispatchedMessages.AddFirst(dispatch);
- }
-
- if (this.session.IsTransacted)
- {
- if (this.transactedIndividualAck)
- {
- ImmediateIndividualTransactedAck(dispatch);
- }
- else
- {
- this.AckLater(dispatch, AckType.DeliveredAck);
- }
- }
- }
- }
+ this.optimizeAcknowledge = value;
+ }
+ }
- private bool IsOptimizedAckTime()
- {
- // evaluate both expired and normal msgs as otherwise consumer may get stalled
- if (ackCounter + deliveredCounter >= (this.info.PrefetchSize * .65))
- {
- return true;
- }
-
- if (optimizeAcknowledgeTimeOut > 0)
- {
- DateTime deadline = optimizeAckTimestamp +
- TimeSpan.FromMilliseconds(optimizeAcknowledgeTimeOut);
-
- if (DateTime.Now >= deadline)
- {
- return true;
- }
- }
-
- return false;
- }
-
- public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
- {
- if(this.unconsumedMessages.Closed)
- {
- return;
- }
-
- if(expired)
- {
- lock(this.dispatchedMessages)
- {
- this.dispatchedMessages.Remove(dispatch);
- }
-
- Acknowledge(dispatch, AckType.DeliveredAck);
- }
- else
- {
- if(this.session.IsTransacted)
- {
- // Do nothing.
- }
- else if(this.IsAutoAcknowledgeEach)
- {
- if(this.deliveringAcks.CompareAndSet(false, true))
- {
- lock(this.dispatchedMessages)
- {
- if(this.dispatchedMessages.Count != 0)
- {
- if (this.optimizeAcknowledge)
- {
- this.ackCounter++;
-
- if (IsOptimizedAckTime())
- {
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
- if (ack != null)
- {
- this.dispatchedMessages.Clear();
- this.ackCounter = 0;
- this.session.SendAck(ack);
- this.optimizeAckTimestamp = DateTime.Now;
- }
- // as further optimization send ack for expired msgs wehn
- // there are any. This resets the deliveredCounter to 0 so
- // that we won't sent standard acks with every msg just
- // because the deliveredCounter just below 0.5 * prefetch
- // as used in ackLater()
- if (this.pendingAck != null && this.deliveredCounter > 0)
- {
- this.session.SendAck(pendingAck);
- this.pendingAck = null;
- this.deliveredCounter = 0;
- }
- }
- }
- else
- {
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
- if (ack != null)
- {
- this.dispatchedMessages.Clear();
- this.session.SendAck(ack);
- }
- }
- }
- }
- this.deliveringAcks.Value = false;
- }
- }
- else if(this.IsAutoAcknowledgeBatch)
- {
- AckLater(dispatch, AckType.ConsumedAck);
- }
- else if(IsClientAcknowledge || IsIndividualAcknowledge)
- {
- bool messageAckedByConsumer = false;
-
- lock(this.dispatchedMessages)
- {
- messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
- }
-
- if(messageAckedByConsumer)
- {
- AckLater(dispatch, AckType.DeliveredAck);
- }
- }
- else
- {
- throw new NMSException("Invalid session state.");
- }
- }
- }
-
- private MessageAck MakeAckForAllDeliveredMessages(AckType type)
- {
- lock(this.dispatchedMessages)
- {
- if(this.dispatchedMessages.Count == 0)
- {
- return null;
- }
-
- MessageDispatch dispatch = this.dispatchedMessages.First.Value;
- MessageAck ack = new MessageAck(dispatch, (byte) type, this.dispatchedMessages.Count);
- ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
- return ack;
- }
- }
-
- private void AckLater(MessageDispatch dispatch, AckType type)
- {
- // Don't acknowledge now, but we may need to let the broker know the
- // consumer got the message to expand the pre-fetch window
- if(this.session.IsTransacted)
- {
- RegisterSync();
- }
-
- this.deliveredCounter++;
-
- MessageAck oldPendingAck = pendingAck;
-
- pendingAck = new MessageAck(dispatch, (byte) type, deliveredCounter);
-
- if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
- {
- pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
- }
-
- if(oldPendingAck == null)
- {
- pendingAck.FirstMessageId = pendingAck.LastMessageId;
- }
- else if(oldPendingAck.AckType == pendingAck.AckType)
- {
- pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
- }
- else
- {
- // old pending ack being superseded by ack of another type, if is is not a delivered
- // ack and hence important, send it now so it is not lost.
- if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
-
- this.session.SendAck(oldPendingAck);
- }
- else
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
- }
- }
- }
-
- // evaluate both expired and normal msgs as otherwise consumer may get stalled
- if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
- {
- this.session.SendAck(pendingAck);
- this.pendingAck = null;
- this.deliveredCounter = 0;
- this.additionalWindowSize = 0;
- }
- }
-
- private void ImmediateIndividualTransactedAck(MessageDispatch dispatch)
- {
- // acks accumulate on the broker pending transaction completion to indicate
- // delivery status
- RegisterSync();
- MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
- ack.TransactionId = session.TransactionContext.TransactionId;
- this.session.Connection.SyncRequest(ack);
- }
+ public long OptimizeAcknowledgeTimeOut
+ {
+ get { return this.optimizeAcknowledgeTimeOut; }
+ set { this.optimizeAcknowledgeTimeOut = value; }
+ }
- private void PosionAck(MessageDispatch dispatch, string cause)
+ public long OptimizedAckScheduledAckInterval
{
- BrokerError poisonCause = new BrokerError();
- poisonCause.ExceptionClass = "javax.jms.JMSException";
- poisonCause.Message = cause;
+ get { return this.optimizedAckScheduledAckInterval; }
+ set
+ {
+ this.optimizedAckScheduledAckInterval = value;
- MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
- posionAck.FirstMessageId = dispatch.Message.MessageId;
- posionAck.PoisonCause = poisonCause;
- this.session.Connection.SyncRequest(posionAck);
+ if (this.optimizedAckTask != null)
+ {
+ this.session.Scheduler.Cancel(this.optimizedAckTask);
+ this.optimizedAckTask = null;
+ }
+
+ // Should we periodically send out all outstanding acks.
+ if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0)
+ {
+ this.optimizedAckTask = new WaitCallback(DoOptimizedAck);
+ this.session.Scheduler.ExecutePeriodically(
+ optimizedAckTask, null, TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
+ }
+ }
}
- private void RegisterSync()
- {
- // Don't acknowledge now, but we may need to let the broker know the
- // consumer got the message to expand the pre-fetch window
- if(this.session.IsTransacted)
- {
- this.session.DoStartTransaction();
-
- if(!synchronizationRegistered)
- {
- Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
- this.info.ConsumerId);
- this.synchronizationRegistered = true;
- this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
- }
- }
- }
-
- private void Acknowledge(MessageDispatch dispatch, AckType ackType)
- {
- MessageAck ack = new MessageAck(dispatch, (byte) ackType, 1);
- this.session.SendAck(ack);
- lock(this.dispatchedMessages)
- {
- dispatchedMessages.Remove(dispatch);
- }
- }
-
- internal void Acknowledge()
- {
- ClearDispatchList();
- WaitForRedeliveries();
-
- lock(this.dispatchedMessages)
- {
- // Acknowledge all messages so far.
- MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-
- if(ack == null)
- {
- return; // no msgs
- }
-
- if(this.session.IsTransacted)
- {
- RollbackOnFailedRecoveryRedelivery();
- if (!this.session.TransactionContext.InTransaction)
- {
- this.session.DoStartTransaction();
- }
- ack.TransactionId = this.session.TransactionContext.TransactionId;
- }
+ public long FailoverRedeliveryWaitPeriod
+ {
+ get { return this.failoverRedeliveryWaitPeriod; }
+ set { this.failoverRedeliveryWaitPeriod = value; }
+ }
- this.session.SendAck(ack);
- this.pendingAck = null;
+ public bool TransactedIndividualAck
+ {
+ get { return this.transactedIndividualAck; }
+ set { this.transactedIndividualAck = value; }
+ }
- // Adjust the counters
- this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
- this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
-
- if(!this.session.IsTransacted)
- {
- this.dispatchedMessages.Clear();
- }
- }
- }
-
- internal void Commit()
- {
- lock(this.dispatchedMessages)
- {
- this.dispatchedMessages.Clear();
- ClearPreviouslyDelivered();
- }
-
- this.redeliveryDelay = 0;
- }
-
- internal void Rollback()
- {
- lock(this.unconsumedMessages.SyncRoot)
- {
- if (this.optimizeAcknowledge)
- {
- // remove messages read but not acked at the broker yet through optimizeAcknowledge
- if (!this.info.Browser)
- {
- lock(this.dispatchedMessages)
- {
- for (int i = 0; (i < this.dispatchedMessages.Count) && (i < ackCounter); i++)
- {
- // ensure we don't filter this as a duplicate
- MessageDispatch dispatch = this.dispatchedMessages.Last.Value;
- this.dispatchedMessages.RemoveLast();
- session.Connection.RollbackDuplicate(this, dispatch.Message);
- }
- }
- }
- }
- lock(this.dispatchedMessages)
- {
- RollbackPreviouslyDeliveredAndNotRedelivered();
- if(this.dispatchedMessages.Count == 0)
- {
- Tracer.DebugFormat("Consumer {0} Rolled Back, no dispatched Messages",
- this.info.ConsumerId);
- return;
- }
-
- // Only increase the redelivery delay after the first redelivery..
- MessageDispatch lastMd = this.dispatchedMessages.First.Value;
- int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
-
- redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
-
- MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
- foreach(MessageDispatch dispatch in this.dispatchedMessages)
- {
- // Allow the message to update its internal to reflect a Rollback.
- dispatch.Message.OnMessageRollback();
- // ensure we don't filter this as a duplicate
- session.Connection.RollbackDuplicate(this, dispatch.Message);
- }
-
- if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
- lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
- {
- // We need to NACK the messages so that they get sent to the DLQ.
- MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, dispatchedMessages.Count);
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer {0} Poison Ack of {1} messages aft max redeliveries: {2}",
- this.info.ConsumerId, this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
- }
+ public bool NonBlockingRedelivery
+ {
+ get { return this.nonBlockingRedelivery; }
+ set { this.nonBlockingRedelivery = value; }
+ }
- BrokerError poisonCause = new BrokerError();
- poisonCause.ExceptionClass = "javax.jms.JMSException";
- poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;
+ #endregion
- if (lastMd.RollbackCause != null)
- {
- BrokerError cause = new BrokerError();
- poisonCause.ExceptionClass = "javax.jms.JMSException";
- poisonCause.Message = lastMd.RollbackCause.Message;
- poisonCause.Cause = cause;
- }
- ack.FirstMessageId = firstMsgId;
- ack.PoisonCause = poisonCause;
+ #region IMessageConsumer Members
- this.session.SendAck(ack);
+ private ConsumerTransformerDelegate consumerTransformer;
+ /// <summary>
+ /// A Delegate that is called each time a Message is dispatched to allow the client to do
+ /// any necessary transformations on the received message before it is delivered.
+ /// </summary>
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
- // Adjust the window size.
- additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+ public event MessageListener Listener
+ {
+ add
+ {
+ CheckClosed();
- this.redeliveryDelay = 0;
- this.deliveredCounter -= this.dispatchedMessages.Count;
- this.dispatchedMessages.Clear();
- }
- else
- {
- // We only send a RedeliveryAck after the first redelivery
- if(currentRedeliveryCount > 0)
- {
- MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, dispatchedMessages.Count);
- ack.FirstMessageId = firstMsgId;
- this.session.SendAck(ack);
- }
-
- if (this.nonBlockingRedelivery)
- {
- if(redeliveryDelay == 0)
- {
- redeliveryDelay = RedeliveryPolicy.InitialRedeliveryDelay;
- }
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages in Non-Blocking mode, delay: {2}",
- this.info.ConsumerId, this.dispatchedMessages.Count, redeliveryDelay);
- }
+ if(this.PrefetchSize == 0)
+ {
+ throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
+ }
- List<MessageDispatch> pendingRedeliveries =
- new List<MessageDispatch>(this.dispatchedMessages);
- pendingRedeliveries.Reverse();
+ bool wasStarted = this.session.Started;
- this.deliveredCounter -= this.dispatchedMessages.Count;
- this.dispatchedMessages.Clear();
+ if(wasStarted)
+ {
+ this.session.Stop();
+ }
- this.session.Scheduler.ExecuteAfterDelay(
- NonBlockingRedeliveryCallback,
- pendingRedeliveries,
- TimeSpan.FromMilliseconds(redeliveryDelay));
- }
- else
- {
- // stop the delivery of messages.
- this.unconsumedMessages.Stop();
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages",
- this.info.ConsumerId, this.dispatchedMessages.Count);
- }
-
- foreach(MessageDispatch dispatch in this.dispatchedMessages)
- {
- this.unconsumedMessages.EnqueueFirst(dispatch);
- }
-
- this.deliveredCounter -= this.dispatchedMessages.Count;
- this.dispatchedMessages.Clear();
-
- if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
- {
- DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
- ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
- }
- else
- {
- Start();
- }
- }
- }
- }
- }
-
- // Only redispatch if there's an async listener otherwise a synchronous
- // consumer will pull them from the local queue.
- if(this.listener != null)
- {
- this.session.Redispatch(this.unconsumedMessages);
- }
- }
-
- private void RollbackHelper(Object arg)
- {
- try
- {
- TimeSpan waitTime = (DateTime) arg - DateTime.Now;
-
- if(waitTime.CompareTo(TimeSpan.Zero) > 0)
- {
- Thread.Sleep(waitTime);
- }
-
- this.Start();
- }
- catch(Exception e)
- {
- if(!this.unconsumedMessages.Closed)
- {
- this.session.Connection.OnSessionException(this.session, e);
- }
- }
- }
-
- private void NonBlockingRedeliveryCallback(object arg)
- {
- try
- {
- if (!this.unconsumedMessages.Closed)
- {
- List<MessageDispatch> pendingRedeliveries = arg as List<MessageDispatch>;
+ listener += value;
+ this.session.Redispatch(this, this.unconsumedMessages);
- foreach (MessageDispatch dispatch in pendingRedeliveries)
- {
- session.Dispatch(dispatch);
- }
+ if(wasStarted)
+ {
+ this.session.Start();
}
- }
- catch (Exception e)
- {
- session.Connection.OnAsyncException(e);
- }
- }
-
- private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch)
- {
- ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
-
- if(this.ConsumerTransformer != null)
- {
- IMessage newMessage = ConsumerTransformer(this.session, this, message);
- if(newMessage != null)
- {
- message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage);
- }
- }
-
- message.Connection = this.session.Connection;
-
- if(IsClientAcknowledge)
- {
- message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
- }
- else if(IsIndividualAcknowledge)
- {
- message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
- }
- else
- {
- message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
- }
-
- return message;
- }
-
- private void CheckClosed()
- {
- if(this.unconsumedMessages.Closed)
- {
- throw new NMSException("The Consumer has been Closed");
- }
- }
-
- private void CheckMessageListener()
- {
- if(this.listener != null)
- {
- throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
- }
- }
-
- protected bool IsAutoAcknowledgeEach
- {
- get
- {
- return this.session.IsAutoAcknowledge ||
- (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
- }
- }
-
- protected bool IsAutoAcknowledgeBatch
- {
- get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
- }
+ }
+ remove { listener -= value; }
+ }
- protected bool IsIndividualAcknowledge
- {
- get { return this.session.IsIndividualAcknowledge; }
- }
+ public IMessage Receive()
+ {
+ CheckClosed();
+ CheckMessageListener();
- protected bool IsClientAcknowledge
- {
- get { return this.session.IsClientAcknowledge; }
- }
+ SendPullRequest(0);
+ MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
- internal bool IsInUse(ActiveMQTempDestination dest)
+ if(dispatch == null)
+ {
+ return null;
+ }
+
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateActiveMQMessage(dispatch);
+ }
+
+ public IMessage Receive(TimeSpan timeout)
{
- return this.info.Destination.Equals(dest);
+ CheckClosed();
+ CheckMessageListener();
+
+ MessageDispatch dispatch = null;
+ SendPullRequest((long) timeout.TotalMilliseconds);
+
+ if(this.PrefetchSize == 0)
+ {
+ dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+ }
+ else
+ {
+ dispatch = this.Dequeue(timeout);
+ }
+
+ if(dispatch == null)
+ {
+ return null;
+ }
+
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateActiveMQMessage(dispatch);
}
- internal bool Closed
- {
- get { return this.unconsumedMessages.Closed; }
- }
+ public IMessage ReceiveNoWait()
+ {
+ CheckClosed();
+ CheckMessageListener();
+
+ MessageDispatch dispatch = null;
+ SendPullRequest(-1);
+
+ if(this.PrefetchSize == 0)
+ {
+ dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+ }
+ else
+ {
+ dispatch = this.Dequeue(TimeSpan.Zero);
+ }
+
+ if(dispatch == null)
+ {
+ return null;
+ }
- private void DoOptimizedAck(object state)
- {
- if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed)
- {
- DeliverAcks();
- }
- }
-
- private void WaitForRedeliveries()
- {
- if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null)
- {
- DateTime expiry = DateTime.Now + TimeSpan.FromMilliseconds(failoverRedeliveryWaitPeriod);
- int numberNotReplayed;
- do
- {
- numberNotReplayed = 0;
- lock(this.dispatchedMessages)
- {
- if (previouslyDeliveredMessages != null)
- {
- foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
- {
- if (!entry.Value)
- {
- numberNotReplayed++;
- }
- }
- }
- }
- if (numberNotReplayed > 0)
- {
- Tracer.Info("waiting for redelivery of " + numberNotReplayed + " in transaction: " +
- previouslyDeliveredMessages.TransactionId + ", to consumer :" +
- this.info.ConsumerId);
- Thread.Sleep((int) Math.Max(500, failoverRedeliveryWaitPeriod/4));
- }
- }
- while (numberNotReplayed > 0 && expiry > DateTime.Now);
- }
- }
-
- // called with deliveredMessages locked
- private void RollbackOnFailedRecoveryRedelivery()
- {
- if (previouslyDeliveredMessages != null)
- {
- // if any previously delivered messages was not re-delivered, transaction is
- // invalid and must rollback as messages have been dispatched else where.
- int numberNotReplayed = 0;
- foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
- {
- if (!entry.Value)
- {
- numberNotReplayed++;
- if (Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("previously delivered message has not been replayed in transaction: " +
- previouslyDeliveredMessages.TransactionId + " , messageId: " + entry.Key);
- }
- }
- }
-
- if (numberNotReplayed > 0)
- {
- String message = "rolling back transaction (" +
- previouslyDeliveredMessages.TransactionId + ") post failover recovery. " + numberNotReplayed +
- " previously delivered message(s) not replayed to consumer: " + this.info.ConsumerId;
- Tracer.Warn(message);
- throw new TransactionRolledBackException(message);
- }
- }
- }
-
- // called with unconsumedMessages && dispatchedMessages locked
- // remove any message not re-delivered as they can't be replayed to this
- // consumer on rollback
- private void RollbackPreviouslyDeliveredAndNotRedelivered()
- {
- if (previouslyDeliveredMessages != null)
- {
- foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
- {
- if (!entry.Value)
- {
- RemoveFromDeliveredMessages(entry.Key);
- }
- }
-
- ClearPreviouslyDelivered();
- }
- }
-
- // Must be called with dispatchedMessages locked
- private void RemoveFromDeliveredMessages(MessageId key)
- {
- MessageDispatch toRemove = null;
- foreach(MessageDispatch candidate in this.dispatchedMessages)
- {
- if (candidate.Message.MessageId.Equals(key))
- {
- session.Connection.RollbackDuplicate(this, candidate.Message);
- toRemove = candidate;
- break;
- }
- }
-
- if (toRemove != null)
- {
- this.dispatchedMessages.Remove(toRemove);
- }
- }
-
- // called with deliveredMessages locked
- private void ClearPreviouslyDelivered()
- {
- if (previouslyDeliveredMessages != null)
- {
- previouslyDeliveredMessages.Clear();
- previouslyDeliveredMessages = null;
- }
- }
-
- #region Transaction Redelivery Tracker
-
- class PreviouslyDeliveredMap : Dictionary<MessageId, bool>
- {
- private TransactionId transactionId;
- public TransactionId TransactionId
- {
- get { return this.transactionId; }
- }
-
- public PreviouslyDeliveredMap(TransactionId transactionId) : base()
- {
- this.transactionId = transactionId;
- }
- }
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateActiveMQMessage(dispatch);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
- private bool RedeliveryExceeded(MessageDispatch dispatch)
+ protected void Dispose(bool disposing)
{
- try
+ if(disposed)
{
- ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage;
+ return;
+ }
- return session.IsTransacted && redeliveryPolicy != null &&
- redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES &&
- dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries &&
- // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
- !amqMessage.Properties.Contains("redeliveryDelay");
+ try
+ {
+ Close();
}
- catch (Exception ignored)
+ catch
{
- return false;
+ // Ignore network errors.
}
+
+ disposed = true;
}
- #endregion
+ public virtual void Close()
+ {
+ if(!this.unconsumedMessages.Closed)
+ {
+ if(this.deliveredMessages.Count != 0 && this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+ {
+ Tracer.DebugFormat("Consumer[{0}] Registering new ConsumerCloseSynchronization",
+ this.info.ConsumerId);
+ this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+ }
+ else
+ {
+ Tracer.DebugFormat("Consumer[{0}] No Active TX or pending acks, closing normally.",
+ this.info.ConsumerId);
+ this.DoClose();
+ }
+ }
+ }
- #region Nested ISyncronization Types
+ internal void DoClose()
+ {
+ Shutdown();
+ RemoveInfo removeCommand = new RemoveInfo();
+ removeCommand.ObjectId = this.ConsumerId;
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Remove of Consumer[{0}] of destination [{1}] sent last delivered Id[{2}].",
+ this.ConsumerId, this.info.Destination, this.lastDeliveredSequenceId);
+ }
+ removeCommand.LastDeliveredSequenceId = lastDeliveredSequenceId;
+ this.session.Connection.Oneway(removeCommand);
+ }
- class MessageConsumerSynchronization : ISynchronization
- {
- private readonly MessageConsumer consumer;
-
- public MessageConsumerSynchronization(MessageConsumer consumer)
- {
- this.consumer = consumer;
- }
-
- public void BeforeEnd()
- {
- Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd Called for Consumer {0}.",
- this.consumer.ConsumerId);
+ /// <summary>
+ /// Called from the parent Session of this Consumer to indicate that its
+ /// parent session is closing and this Consumer should close down but not
+ /// send any message to the Broker as the parent close will take care of
+ /// removing its child resources at the broker.
+ /// </summary>
+ internal void Shutdown()
+ {
+ if(!this.unconsumedMessages.Closed)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
+ }
- if (this.consumer.TransactedIndividualAck)
- {
- this.consumer.ClearDispatchList();
- this.consumer.WaitForRedeliveries();
- lock(this.consumer.dispatchedMessages)
- {
- this.consumer.RollbackOnFailedRecoveryRedelivery();
+ // Do we have any acks we need to send out before closing?
+ // Ack any delivered messages now.
+ if(!this.session.IsTransacted)
+ {
+ DeliverAcks();
+ if(this.IsAutoAcknowledgeBatch)
+ {
+ Acknowledge();
}
- }
- else
- {
- this.consumer.Acknowledge();
}
- this.consumer.synchronizationRegistered = false;
- }
-
- public void AfterCommit()
- {
- Tracer.DebugFormat("MessageConsumerSynchronization - AfterCommit Called for Consumer {0}.",
- this.consumer.ConsumerId);
- this.consumer.Commit();
- this.consumer.synchronizationRegistered = false;
- }
-
- public void AfterRollback()
- {
- Tracer.DebugFormat("MessageConsumerSynchronization - AfterRollback Called for Consumer {0}.",
- this.consumer.ConsumerId);
- this.consumer.Rollback();
- this.consumer.synchronizationRegistered = false;
- }
- }
-
- protected class ConsumerCloseSynchronization : ISynchronization
- {
- private readonly MessageConsumer consumer;
-
- public ConsumerCloseSynchronization(MessageConsumer consumer)
- {
- this.consumer = consumer;
- }
-
- public void BeforeEnd()
- {
- }
-
- public void AfterCommit()
- {
- if (!this.consumer.Closed)
+ if (this.executor != null)
{
- Tracer.DebugFormat("ConsumerCloseSynchronization - AfterCommit Called for Consumer {0}.",
- this.consumer.ConsumerId);
- this.consumer.DoClose();
+ this.executor.Shutdown();
+ this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+ this.executor = null;
+ }
+ if (this.optimizedAckTask != null)
+ {
+ this.session.Scheduler.Cancel(this.optimizedAckTask);
}
- }
- public void AfterRollback()
- {
- if (!this.consumer.Closed)
+ if (this.session.IsClientAcknowledge)
{
- Tracer.DebugFormat("ConsumerCloseSynchronization - AfterRollback Called for Consumer {0}.",
- this.consumer.ConsumerId);
- this.consumer.DoClose();
+ if (!this.info.Browser)
+ {
+ // rollback duplicates that aren't acknowledged
+ LinkedList<MessageDispatch> temp = null;
+ lock(this.deliveredMessages)
+ {
+ temp = new LinkedList<MessageDispatch>(this.deliveredMessages);
+ }
+ foreach (MessageDispatch old in temp)
+ {
+ this.session.Connection.RollbackDuplicate(this, old.Message);
+ }
+ temp.Clear();
+ }
+ }
+
+ if(!this.session.IsTransacted)
+ {
+ lock(this.deliveredMessages)
+ {
+ deliveredMessages.Clear();
+ }
+ }
+
+ this.session.RemoveConsumer(this);
+ this.unconsumedMessages.Close();
+
+ MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
+ if (!this.info.Browser)
+ {
+ foreach (MessageDispatch old in unconsumed)
+ {
+ // ensure we don't filter this as a duplicate
+ session.Connection.RollbackDuplicate(this, old.Message);
+ }
+ }
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
+ }
+ }
+ }
+
+ #endregion
+
+ protected void SendPullRequest(long timeout)
+ {
+ if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty)
+ {
+ MessagePull messagePull = new MessagePull();
+ messagePull.ConsumerId = this.info.ConsumerId;
+ messagePull.Destination = this.info.Destination;
+ messagePull.Timeout = timeout;
+ messagePull.ResponseRequired = false;
+
+ Tracer.DebugFormat("Consumer[{0}] sending MessagePull: {1}", ConsumerId, messagePull);
+
+ session.Connection.Oneway(messagePull);
+ }
+ }
+
+ protected void DoIndividualAcknowledge(ActiveMQMessage message)
+ {
+ MessageDispatch dispatch = null;
+
+ lock(this.deliveredMessages)
+ {
+ foreach(MessageDispatch originalDispatch in this.deliveredMessages)
+ {
+ if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+ {
+ dispatch = originalDispatch;
+ this.deliveredMessages.Remove(originalDispatch);
+ break;
+ }
}
- }
- }
+ }
+
+ if(dispatch == null)
+ {
+ Tracer.DebugFormat("Consumer[{0}] attempt to Ack MessageId[{1}] failed " +
+ "because the original dispatch is not in the Dispatch List",
+ ConsumerId, message.MessageId);
+ return;
+ }
+
+ MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+ Tracer.DebugFormat("Consumer[{0}] sending Individual Ack for MessageId: {1}",
+ ConsumerId, ack.LastMessageId);
+ this.session.SendAck(ack);
+ }
+
+ protected void DoNothingAcknowledge(ActiveMQMessage message)
+ {
+ }
+
+ protected void DoClientAcknowledge(ActiveMQMessage message)
+ {
+ this.CheckClosed();
+ Tracer.DebugFormat("Consumer[{0}] sending Client Ack", ConsumerId);
+ this.session.Acknowledge();
+ }
+
+ public void Start()
+ {
+ if(this.unconsumedMessages.Closed)
+ {
+ return;
+ }
+
+ this.started.Value = true;
+ this.unconsumedMessages.Start();
+ this.session.Executor.Wakeup();
+ }
+
+ public void Stop()
+ {
+ this.started.Value = false;
+ this.unconsumedMessages.Stop();
+ }
+
+ public void DeliverAcks()
+ {
+ MessageAck ack = null;
+
+ if(this.deliveringAcks.CompareAndSet(false, true))
+ {
+ if(this.IsAutoAcknowledgeEach)
+ {
+ lock(this.deliveredMessages)
+ {
+ ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+ if(ack != null)
+ {
+ Tracer.DebugFormat("Consumer[{0}] DeliverAcks clearing the Dispatch list", ConsumerId);
+ this.deliveredMessages.Clear();
+ this.ackCounter = 0;
+ }
+ else
+ {
+ ack = this.pendingAck;
+ this.pendingAck = null;
+ }
+ }
+ }
+ else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
+ {
+ ack = pendingAck;
+ pendingAck = null;
+ }
+
+ if(ack != null)
+ {
+ if (this.executor == null)
+ {
+ this.executor = new ThreadPoolExecutor();
+ }
+
+ this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
+ }
+ else
+ {
+ this.deliveringAcks.Value = false;
+ }
+ }
+ }
+
+ private void AsyncDeliverAck(object ack)
+ {
+ MessageAck pending = ack as MessageAck;
+ try
+ {
+ this.session.SendAck(pending, true);
+ }
+ catch
+ {
+ Tracer.ErrorFormat("Consumer[{0}] Failed to deliver async Ack {1}",
+ this.info.ConsumerId, pending);
+ }
+ finally
+ {
+ this.deliveringAcks.Value = false;
+ }
+ }
+
+ internal void InProgressClearRequired()
+ {
+ inProgressClearRequiredFlag = true;
+ // deal with delivered messages async to avoid lock contention with in progress acks
+ clearDeliveredList = true;
+ }
+
+ internal void ClearMessagesInProgress()
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ // Called from a thread in the ThreadPool, so we wait until we can
+ // get a lock on the unconsumed list then we clear it.
+ lock(this.unconsumedMessages.SyncRoot)
+ {
+ if(inProgressClearRequiredFlag)
+ {
+ Tracer.DebugFormat("Consumer[{0}] clearing unconsumed list ({1}) on transport interrupt",
+ ConsumerId, this.unconsumedMessages.Count);
+
+ // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
+ MessageDispatch[] list = this.unconsumedMessages.RemoveAll();
+ if (!this.info.Browser)
+ {
+ foreach (MessageDispatch unconsumed in list)
+ {
+ session.Connection.RollbackDuplicate(this, unconsumed.Message);
+ }
+ }
+
+ // allow dispatch on this connection to resume
+ this.session.Connection.TransportInterruptionProcessingComplete();
+ this.inProgressClearRequiredFlag = false;
+
+ // Wake up anyone blocked on the message channel.
+ this.unconsumedMessages.Signal();
+ }
+ }
+ }
+ ClearDeliveredList();
+ }
+
+ private void ClearDeliveredList()
+ {
+ if (this.clearDeliveredList)
+ {
+ lock(this.deliveredMessages)
+ {
+ if (!this.clearDeliveredList || deliveredMessages.Count == 0)
+ {
+ clearDeliveredList = false;
+ return;
+ }
+
+ if (session.IsTransacted)
+ {
+ Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
+ this.info.ConsumerId, deliveredMessages.Count);
+
+ if (previouslyDeliveredMessages == null)
+ {
+ previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
+ }
+
+ foreach (MessageDispatch delivered in deliveredMessages)
+ {
+ this.previouslyDeliveredMessages[delivered.Message.MessageId] = false;
+ }
+ }
+ else
+ {
+ if (this.session.IsClientAcknowledge)
+ {
+ Tracer.DebugFormat("Consumer[{0}] rolling back delivered list " +
+ "({1}) on transport interrupt",
+ ConsumerId, deliveredMessages.Count);
+
+ // allow redelivery
+ if (!this.info.Browser)
+ {
+ foreach (MessageDispatch dispatch in deliveredMessages)
+ {
+ this.session.Connection.RollbackDuplicate(this, dispatch.Message);
+ }
+ }
+ }
+ Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
+ this.info.ConsumerId, deliveredMessages.Count);
+ this.deliveredMessages.Clear();
+ this.pendingAck = null;
+ }
+
+ this.clearDeliveredList = false;
+ }
+ }
+ }
+
+ public virtual void Dispatch(MessageDispatch dispatch)
+ {
+ MessageListener listener = this.listener;
+ bool dispatchMessage = false;
+
+ try
+ {
+ ClearMessagesInProgress();
+ ClearDeliveredList();
+
+ lock(this.unconsumedMessages.SyncRoot)
+ {
+ if(!this.unconsumedMessages.Closed)
+ {
+ if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message))
+ {
+ if(listener != null && this.unconsumedMessages.Running)
+ {
+ if (RedeliveryExceeded(dispatch))
+ {
+ PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+ return;
+ }
+ else
+ {
+ dispatchMessage = true;
+ }
+ }
+ else
+ {
+ if (!this.unconsumedMessages.Running)
+ {
+ // delayed redelivery, ensure it can be re delivered
+ session.Connection.RollbackDuplicate(this, dispatch.Message);
+ }
+ this.unconsumedMessages.Enqueue(dispatch);
+ // TODO - Signal message available when we have that event hook.
+ }
+ }
+ else
+ {
+ // deal with duplicate delivery
+ ConsumerId consumerWithPendingTransaction;
+ if (RedeliveryExpectedInCurrentTransaction(dispatch, true))
+ {
+ Tracer.DebugFormat("Consumer[{0}] tracking transacted({1}) redelivery [{2}]",
+ ConsumerId, previouslyDeliveredMessages.TransactionId, dispatch.Message);
+ if (TransactedIndividualAck)
+ {
+ ImmediateIndividualTransactedAck(dispatch);
+ }
+ else
+ {
+ this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1));
+ }
+ }
+ else if ((consumerWithPendingTransaction = RedeliveryPendingInCompetingTransaction(dispatch)) != null)
+ {
+ Tracer.WarnFormat("Consumer[{0}] delivering duplicate [{1}], pending transaction completion on ({1}) will rollback",
+ ConsumerId, dispatch.Message, consumerWithPendingTransaction);
+ this.session.Connection.RollbackDuplicate(this, dispatch.Message);
+ Dispatch(dispatch);
+ }
+ else
+ {
+ Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})",
+ ConsumerId, dispatch);
+ PosionAck(dispatch, "Suppressing duplicate delivery on connection, consumer " + ConsumerId);
+ }
+ }
+ }
+ }
+
+ if(dispatchMessage)
+ {
+ ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+
+ this.BeforeMessageIsConsumed(dispatch);
+
+ try
+ {
+ bool expired = (!IgnoreExpiration && message.IsExpired());
+
+ if(!expired)
+ {
+ listener(message);
+ }
+
+ this.AfterMessageIsConsumed(dispatch, expired);
+ }
+ catch(Exception e)
+ {
+ if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
+ {
+ // Schedule redelivery and possible dlq processing
+ dispatch.RollbackCause = e;
+ Rollback();
+ }
+ else
+ {
+ // Transacted or Client ack: Deliver the next message.
+ this.AfterMessageIsConsumed(dispatch, false);
+ }
+
[... 1012 lines stripped ...]