You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/20 00:44:29 UTC

svn commit: r1470081 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs main/csharp/Session.cs test/csharp/OptimizedAckTest.cs

Author: tabish
Date: Fri Apr 19 22:44:28 2013
New Revision: 1470081

URL: http://svn.apache.org/r1470081
Log:
fixes for:
https://issues.apache.org/jira/browse/AMQNET-431
https://issues.apache.org/jira/browse/AMQNET-430
https://issues.apache.org/jira/browse/AMQNET-429
https://issues.apache.org/jira/browse/AMQNET-428
https://issues.apache.org/jira/browse/AMQNET-329

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1470081&r1=1470080&r2=1470081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Fri Apr 19 22:44:28 2013
@@ -20,6 +20,7 @@ using System.Collections.Generic;
 using System.Collections.Specialized;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Util;
+using Apache.NMS.ActiveMQ.Threads;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
@@ -52,6 +53,7 @@ namespace Apache.NMS.ActiveMQ
 		private int redeliveryTimeout = 500;
 		protected bool disposed = false;
 		private long lastDeliveredSequenceId = 0;
+		private int ackCounter = 0;
 		private int deliveredCounter = 0;
 		private int additionalWindowSize = 0;
 		private long redeliveryDelay = 0;
@@ -59,12 +61,21 @@ namespace Apache.NMS.ActiveMQ
 		private volatile bool synchronizationRegistered = false;
 		private bool clearDispatchList = false;
 		private bool inProgressClearRequiredFlag;
+		private bool optimizeAcknowledge;
+		private DateTime optimizeAckTimestamp = DateTime.Now;
+	    private long optimizeAcknowledgeTimeOut = 0;
+	    private long optimizedAckScheduledAckInterval = 0;
+	    private Timer optimizedAckTimer;
+	    private long failoverRedeliveryWaitPeriod = 0;
+	    private bool transactedIndividualAck = false;
+	    private bool nonBlockingRedelivery = false;
 
         private Exception failureError;
 
 		private event MessageListener listener;
 
 		private IRedeliveryPolicy redeliveryPolicy;
+		private PreviouslyDeliveredMap previouslyDeliveredMessages;
 
 		// Constructor internal to prevent clients from creating an instance.
 		internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
@@ -140,6 +151,19 @@ namespace Apache.NMS.ActiveMQ
 				URISupport.SetProperties(this.info, options);
 				URISupport.SetProperties(this, customConsumerOptions, "nms.");
 			}
+
+	        this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge && 
+									   session.IsAutoAcknowledge && !this.info.Browser;
+	        
+			if (this.optimizeAcknowledge) {
+	            this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
+	            OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
+	        }
+
+	        this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
+	        this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
+	        this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
+	        this.transactedIndividualAck = session.Connection.TransactedIndividualAck || this.nonBlockingRedelivery;
 		}
 
 		~MessageConsumer()
@@ -200,6 +224,74 @@ namespace Apache.NMS.ActiveMQ
             set { this.failureError = value; }
         }
 
+		public bool OptimizeAcknowledge
+		{
+			get { return this.optimizeAcknowledge; }
+			set 
+			{
+				if (optimizeAcknowledge && !value)
+				{
+					DeliverAcks();
+				}
+				this.optimizeAcknowledge = value;
+			}
+		}
+
+		public long OptimizeAcknowledgeTimeOut
+		{
+			get { return this.optimizeAcknowledgeTimeOut; }
+			set { this.optimizeAcknowledgeTimeOut = value; }
+		}
+	    
+		public long OptimizedAckScheduledAckInterval
+		{
+			get { return this.optimizedAckScheduledAckInterval; }
+			set 
+			{ 
+				this.optimizedAckScheduledAckInterval = value; 
+
+		        if (this.optimizedAckTimer != null) 
+				{
+					AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+					this.optimizedAckTimer.Dispose(shutdownEvent);
+					if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(5000), false))
+					{
+						Tracer.WarnFormat("Consumer[{0}]: Optimized Ack Timer Task didn't shutdown properly.", this.info.ConsumerId);
+					}
+
+					this.optimizedAckTimer = null;
+		        }
+
+		        // Should we periodically send out all outstanding acks.
+		        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0)
+				{
+					this.optimizedAckTimer = new Timer(
+						new TimerCallback(DoOptimizedAck),
+						null,
+						optimizedAckScheduledAckInterval,
+						optimizedAckScheduledAckInterval);
+				}
+			}
+		}
+	    
+		public long FailoverRedeliveryWaitPeriod 
+		{
+			get { return this.failoverRedeliveryWaitPeriod; }
+			set { this.failoverRedeliveryWaitPeriod = value; }
+		}
+	    
+		public bool TransactedIndividualAck
+		{
+			get { return this.transactedIndividualAck; }
+			set { this.transactedIndividualAck = value; }
+		}
+	    
+		public bool NonBlockingRedelivery
+		{
+			get { return this.nonBlockingRedelivery; }
+			set { this.nonBlockingRedelivery = value; }
+		}
+
 		#endregion
 
 		#region IMessageConsumer Members
@@ -408,6 +500,29 @@ namespace Apache.NMS.ActiveMQ
 					}
 				}
 
+				if (this.optimizedAckTimer != null)
+				{
+					this.OptimizedAckScheduledAckInterval = 0;
+				}
+
+	            if (this.session.IsClientAcknowledge)
+				{
+	                if (!this.info.Browser) 
+					{
+	                    // rollback duplicates that aren't acknowledged
+	                    LinkedList<MessageDispatch> temp = null;
+					    lock(this.dispatchedMessages)
+						{
+	                        temp = new LinkedList<MessageDispatch>(this.dispatchedMessages);
+	                    }
+	                    foreach (MessageDispatch old in temp) 
+						{
+	                        this.session.Connection.RollbackDuplicate(this, old.Message);
+	                    }
+	                    temp.Clear();
+	                }
+	            }
+
 				if(!this.session.IsTransacted)
 				{
 					lock(this.dispatchedMessages)
@@ -419,6 +534,15 @@ namespace Apache.NMS.ActiveMQ
 				this.session.RemoveConsumer(this);
 				this.unconsumedMessages.Close();
 
+	            MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
+	            if (!this.info.Browser) 
+				{
+	                foreach (MessageDispatch old in unconsumed) 
+					{
+	                    // ensure we don't filter this as a duplicate
+	                    session.Connection.RollbackDuplicate(this, old.Message);
+	                }
+	            }
 				if(Tracer.IsDebugEnabled)
 				{
 					Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
@@ -470,14 +594,7 @@ namespace Apache.NMS.ActiveMQ
 				return;
 			}
 
-			MessageAck ack = new MessageAck();
-
-			ack.AckType = (byte) AckType.IndividualAck;
-			ack.ConsumerId = this.info.ConsumerId;
-			ack.Destination = dispatch.Destination;
-			ack.LastMessageId = dispatch.Message.MessageId;
-			ack.MessageCount = 1;
-
+			MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
 			Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
 			this.session.SendAck(ack);
 		}
@@ -511,39 +628,6 @@ namespace Apache.NMS.ActiveMQ
 			this.unconsumedMessages.Stop();
 		}
 
-		internal void InProgressClearRequired()
-		{
-			inProgressClearRequiredFlag = true;
-			// deal with delivered messages async to avoid lock contention with in progress acks
-			clearDispatchList = true;
-		}
-
-		internal void ClearMessagesInProgress()
-		{
-			if(inProgressClearRequiredFlag)
-			{
-				// Called from a thread in the ThreadPool, so we wait until we can
-				// get a lock on the unconsumed list then we clear it.
-				lock(this.unconsumedMessages)
-				{
-					if(inProgressClearRequiredFlag)
-					{
-						if(Tracer.IsDebugEnabled)
-						{
-							Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
-										 this.unconsumedMessages.Count + ") on transport interrupt");
-						}
-
-						this.unconsumedMessages.Clear();
-
-						// allow dispatch on this connection to resume
-						this.session.Connection.TransportInterruptionProcessingComplete();
-						this.inProgressClearRequiredFlag = false;
-					}
-				}
-			}
-		}
-
 		public void DeliverAcks()
 		{
 			MessageAck ack = null;
@@ -554,11 +638,12 @@ namespace Apache.NMS.ActiveMQ
 				{
 					lock(this.dispatchedMessages)
 					{
-						ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+						ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
 						if(ack != null)
 						{
                             Tracer.Debug("Consumer - DeliverAcks clearing the Dispatch list");
 							this.dispatchedMessages.Clear();
+							this.ackCounter = 0;
 						}
 						else
 						{
@@ -575,16 +660,7 @@ namespace Apache.NMS.ActiveMQ
 
 				if(ack != null)
 				{
-					MessageAck ackToSend = ack;
-
-					try
-					{
-						this.session.SendAck(ackToSend);
-					}
-					catch(Exception e)
-					{
-						Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
-					}
+					this.session.SendAck(ack);
 				}
 				else
 				{
@@ -593,42 +669,175 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		public virtual void Dispatch(MessageDispatch dispatch)
+		internal void InProgressClearRequired()
 		{
-			MessageListener listener = this.listener;
-			bool dispatchMessage = false;
+			inProgressClearRequiredFlag = true;
+			// deal with delivered messages async to avoid lock contention with in progress acks
+			clearDispatchList = true;
+		}
 
-			try
+		internal void ClearMessagesInProgress()
+		{
+			if(inProgressClearRequiredFlag)
 			{
+				// Called from a thread in the ThreadPool, so we wait until we can
+				// get a lock on the unconsumed list then we clear it.
 				lock(this.unconsumedMessages.SyncRoot)
 				{
-					if(this.clearDispatchList)
+					if(inProgressClearRequiredFlag)
 					{
-						// we are reconnecting so lets flush the in progress messages
-						this.clearDispatchList = false;
-						this.unconsumedMessages.Clear();
+						if(Tracer.IsDebugEnabled)
+						{
+							Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+										 this.unconsumedMessages.Count + ") on transport interrupt");
+						}
 
-						if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+    	                // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
+	                    MessageDispatch[] list = this.unconsumedMessages.RemoveAll();
+	                    if (!this.info.Browser) 
 						{
-							// on resumption a pending delivered ack will be out of sync with
-							// re-deliveries.
-							if(Tracer.IsDebugEnabled)
+	                        foreach (MessageDispatch old in list) 
 							{
-								Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
-							}
-							this.pendingAck = null;
-						}
+	                            session.Connection.RollbackDuplicate(this, old.Message);
+	                        }
+	                    }
+
+						// allow dispatch on this connection to resume
+						this.session.Connection.TransportInterruptionProcessingComplete();
+						this.inProgressClearRequiredFlag = false;
 					}
+				}
+			}
+		}
+
+	    private void ClearDispatchList() 
+		{
+	        if (this.clearDispatchList) 
+			{
+				lock(this.dispatchedMessages)
+				{
+	                if (this.clearDispatchList) 
+					{
+	                    if (dispatchedMessages.Count != 0) 
+						{
+	                        if (session.IsTransacted) 
+							{
+	                            if (Tracer.IsDebugEnabled) 
+								{
+									Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
+									                   this.info.ConsumerId, dispatchedMessages.Count);
+	                            }
+	                            if (previouslyDeliveredMessages == null) 
+								{
+	                                previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
+	                            }
+	                            foreach (MessageDispatch delivered in dispatchedMessages) 
+								{
+	                                this.previouslyDeliveredMessages.Add(delivered.Message.MessageId, false);
+	                            }
+	                        } 
+							else 
+							{
+	                            if (Tracer.IsDebugEnabled) 
+								{
+									Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
+									                   this.info.ConsumerId, dispatchedMessages.Count);
+	                            }
+								this.dispatchedMessages.Clear();
+	                            this.pendingAck = null;
+	                        }
+	                    }
+	                    this.clearDispatchList = false;
+	                }
+	            }
+	        }
+	    }
+
+		public virtual void Dispatch(MessageDispatch dispatch)
+		{
+			MessageListener listener = this.listener;
+			bool dispatchMessage = false;
 
+			try
+			{
+				ClearMessagesInProgress();
+				ClearDispatchList();
+
+				lock(this.unconsumedMessages.SyncRoot)
+				{
 					if(!this.unconsumedMessages.Closed)
 					{
-						if(listener != null && this.unconsumedMessages.Running)
+	                    if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message)) 
 						{
-							dispatchMessage = true;
+							if(listener != null && this.unconsumedMessages.Running)
+							{
+								dispatchMessage = true;
+							}
+							else
+							{
+	                            if (!this.unconsumedMessages.Running) 
+								{
+	                                // delayed redelivery, ensure it can be re delivered
+	                                session.Connection.RollbackDuplicate(this, dispatch.Message);
+	                            }
+								this.unconsumedMessages.Enqueue(dispatch);
+							}
 						}
-						else
+						else 
 						{
-							this.unconsumedMessages.Enqueue(dispatch);
+	                        if (!this.session.IsTransacted) 
+							{
+	                            Tracer.Warn("Duplicate dispatch on connection: " + session.Connection.ConnectionId +
+	                                        " to consumer: " + ConsumerId + ", ignoring (auto acking) duplicate: " + dispatch);
+	                            MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+	                            session.SendAck(ack);
+	                        } 
+							else
+							{
+	                            if (Tracer.IsDebugEnabled)
+								{
+									Tracer.DebugFormat("Consumer[{0}]: tracking transacted redelivery of duplicate: {1}",
+									                   this.info.ConsumerId, dispatch.Message);
+	                            }
+	                            bool needsPoisonAck = false;
+	                            lock(this.dispatchedMessages)
+								{
+	                                if (previouslyDeliveredMessages != null) 
+									{
+	                                    previouslyDeliveredMessages.Add(dispatch.Message.MessageId, true);
+	                                } 
+									else 
+									{
+	                                    // delivery while pending redelivery to another consumer on the same connection
+	                                    // not waiting for redelivery will help here
+	                                    needsPoisonAck = true;
+	                                }
+	                            }
+	                            if (needsPoisonAck) 
+								{
+	                                MessageAck poisonAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
+	                                poisonAck.FirstMessageId = dispatch.Message.MessageId;
+									BrokerError cause = new BrokerError();
+									cause.ExceptionClass = "javax.jms.JMSException";
+									cause.Message = "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " + 
+													session.Connection.ConnectionId;
+	                                Tracer.Warn("Acking duplicate delivery as poison, redelivery must be pending to another" +
+	                                            " consumer on this connection, failoverRedeliveryWaitPeriod=" +
+	                                            failoverRedeliveryWaitPeriod + ". Message: " + dispatch + ", poisonAck: " + poisonAck);
+	                                this.session.SendAck(poisonAck);
+	                            } 
+								else 
+								{
+	                                if (transactedIndividualAck) 
+									{
+	                                    ImmediateIndividualTransactedAck(dispatch);
+	                                } 
+									else 
+									{
+	                                    this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1));
+	                                }
+	                            }
+	                        }
 						}
 					}
 				}
@@ -654,7 +863,8 @@ namespace Apache.NMS.ActiveMQ
 					{
 						if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
 						{
-							// Redeliver the message
+                            // Schedule redelivery and possible dlq processing
+                            dispatch.RollbackCause = e;
                             Rollback();
 						}
 						else
@@ -826,11 +1036,40 @@ namespace Apache.NMS.ActiveMQ
 
 				if(this.session.IsTransacted)
 				{
-					this.AckLater(dispatch, AckType.DeliveredAck);
+                	if (this.transactedIndividualAck) 
+					{
+                    	ImmediateIndividualTransactedAck(dispatch);
+                	} 
+					else 
+					{
+						this.AckLater(dispatch, AckType.DeliveredAck);
+                	}
 				}
 			}
 		}
 
+		private bool IsOptimizedAckTime()
+		{
+            // evaluate both expired and normal msgs as otherwise consumer may get stalled
+            if (ackCounter + deliveredCounter >= (this.info.PrefetchSize * .65))
+			{
+				return true;
+			}
+
+			if (optimizeAcknowledgeTimeOut > 0)
+			{
+				DateTime deadline = optimizeAckTimestamp + 
+					TimeSpan.FromMilliseconds(optimizeAcknowledgeTimeOut);
+
+				if (DateTime.Now >= deadline)
+				{
+					return true;
+				}
+			}
+
+			return false;
+		}
+
 		public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
 		{
 			if(this.unconsumedMessages.Closed)
@@ -845,7 +1084,7 @@ namespace Apache.NMS.ActiveMQ
 					this.dispatchedMessages.Remove(dispatch);
 				}
 
-				AckLater(dispatch, AckType.DeliveredAck);
+				Acknowledge(dispatch, AckType.DeliveredAck);
 			}
 			else
 			{
@@ -861,12 +1100,42 @@ namespace Apache.NMS.ActiveMQ
 						{
 							if(this.dispatchedMessages.Count != 0)
 							{
-								MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-								if(ack != null)
+	                            if (this.optimizeAcknowledge) 
 								{
-									this.dispatchedMessages.Clear();
-									this.session.SendAck(ack);
-								}
+	                                this.ackCounter++;
+
+	                                if (IsOptimizedAckTime())
+									{
+	                                    MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+	                                    if (ack != null) 
+										{
+	                                        this.dispatchedMessages.Clear();
+	                                        this.ackCounter = 0;
+	                                        this.session.SendAck(ack);
+	                                        this.optimizeAckTimestamp = DateTime.Now;
+	                                    }
+	                                    // as further optimization send ack for expired msgs wehn
+	                                    // there are any.  This resets the deliveredCounter to 0 so 
+										// that we won't sent standard acks with every msg just
+	                                    // because the deliveredCounter just below 0.5 * prefetch 
+										// as used in ackLater()
+	                                    if (this.pendingAck != null && this.deliveredCounter > 0) 
+										{
+	                                        this.session.SendAck(pendingAck);
+	                                        this.pendingAck = null;
+	                                        this.deliveredCounter = 0;
+	                                    }
+	                                }
+	                            }
+								else 
+								{
+	                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+	                                if (ack != null) 
+									{
+	                                    this.dispatchedMessages.Clear();
+	                                    this.session.SendAck(ack);
+	                                }
+	                            }
 							}
 						}
 						this.deliveringAcks.Value = false;
@@ -907,15 +1176,8 @@ namespace Apache.NMS.ActiveMQ
 				}
 
 				MessageDispatch dispatch = this.dispatchedMessages.First.Value;
-				MessageAck ack = new MessageAck();
-
-				ack.AckType = (byte) type;
-				ack.ConsumerId = this.info.ConsumerId;
-				ack.Destination = dispatch.Destination;
-				ack.LastMessageId = dispatch.Message.MessageId;
-				ack.MessageCount = this.dispatchedMessages.Count;
+				MessageAck ack = new MessageAck(dispatch, (byte) type, this.dispatchedMessages.Count);
 				ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
-
 				return ack;
 			}
 		}
@@ -926,27 +1188,14 @@ namespace Apache.NMS.ActiveMQ
 			// consumer got the message to expand the pre-fetch window
 			if(this.session.IsTransacted)
 			{
-				this.session.DoStartTransaction();
-
-				if(!synchronizationRegistered)
-				{
-                    Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
-                                       this.info.ConsumerId);
-					this.synchronizationRegistered = true;
-					this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
-				}
+				RegisterSync();
 			}
 
 			this.deliveredCounter++;
 
 			MessageAck oldPendingAck = pendingAck;
 
-			pendingAck = new MessageAck();
-			pendingAck.AckType = (byte) type;
-			pendingAck.ConsumerId = this.info.ConsumerId;
-			pendingAck.Destination = dispatch.Destination;
-			pendingAck.LastMessageId = dispatch.Message.MessageId;
-			pendingAck.MessageCount = deliveredCounter;
+        	pendingAck = new MessageAck(dispatch, (byte) type, deliveredCounter);
 
 			if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
 			{
@@ -972,7 +1221,7 @@ namespace Apache.NMS.ActiveMQ
 						Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
 					}
 
-					this.session.Connection.Oneway(oldPendingAck);
+					this.session.SendAck(oldPendingAck);
 				}
 				else
 				{
@@ -983,17 +1232,59 @@ namespace Apache.NMS.ActiveMQ
 				}
 			}
 
-			if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
+	        // evaluate both expired and normal msgs as otherwise consumer may get stalled
+			if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
 			{
-				this.session.Connection.Oneway(pendingAck);
+				this.session.SendAck(pendingAck);
 				this.pendingAck = null;
 				this.deliveredCounter = 0;
 				this.additionalWindowSize = 0;
 			}
 		}
 
+	    private void ImmediateIndividualTransactedAck(MessageDispatch dispatch)
+		{
+	        // acks accumulate on the broker pending transaction completion to indicate
+	        // delivery status
+	        RegisterSync();
+	        MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+			ack.TransactionId = session.TransactionContext.TransactionId;
+	        this.session.Connection.SyncRequest(ack);
+	    }
+
+	    private void RegisterSync()
+		{
+			// Don't acknowledge now, but we may need to let the broker know the
+			// consumer got the message to expand the pre-fetch window
+			if(this.session.IsTransacted)
+			{
+				this.session.DoStartTransaction();
+
+				if(!synchronizationRegistered)
+				{
+                    Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
+                                       this.info.ConsumerId);
+					this.synchronizationRegistered = true;
+					this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
+				}
+			}
+		}
+
+	    private void Acknowledge(MessageDispatch dispatch, AckType ackType)
+		{
+	        MessageAck ack = new MessageAck(dispatch, (byte) ackType, 1);
+	        this.session.SendAck(ack);
+	        lock(this.dispatchedMessages)
+			{
+	            dispatchedMessages.Remove(dispatch);
+	        }
+	    }
+
 		internal void Acknowledge()
 		{
+        	ClearDispatchList();
+        	WaitForRedeliveries();
+
 			lock(this.dispatchedMessages)
 			{
 				// Acknowledge all messages so far.
@@ -1006,6 +1297,7 @@ namespace Apache.NMS.ActiveMQ
 
 				if(this.session.IsTransacted)
 				{
+                	RollbackOnFailedRecoveryRedelivery();
                     if (!this.session.TransactionContext.InTransaction)
                     {
                         this.session.DoStartTransaction();
@@ -1032,6 +1324,7 @@ namespace Apache.NMS.ActiveMQ
 			lock(this.dispatchedMessages)
 			{
 				this.dispatchedMessages.Clear();
+				ClearPreviouslyDelivered();
 			}
 
 			this.redeliveryDelay = 0;
@@ -1041,8 +1334,26 @@ namespace Apache.NMS.ActiveMQ
 		{
 			lock(this.unconsumedMessages.SyncRoot)
 			{
+	            if (this.optimizeAcknowledge) 
+				{
+	                // remove messages read but not acked at the broker yet through optimizeAcknowledge
+	                if (!this.info.Browser) 
+					{
+	                    lock(this.dispatchedMessages)
+						{
+	                        for (int i = 0; (i < this.dispatchedMessages.Count) && (i < ackCounter); i++)
+							{
+	                            // ensure we don't filter this as a duplicate
+								MessageDispatch dispatch = this.dispatchedMessages.Last.Value;
+								this.dispatchedMessages.RemoveLast();
+	                            session.Connection.RollbackDuplicate(this, dispatch.Message);
+	                        }
+	                    }
+	                }
+	            }
 				lock(this.dispatchedMessages)
 				{
+                	RollbackPreviouslyDeliveredAndNotRedelivered();
 					if(this.dispatchedMessages.Count == 0)
 					{
                         Tracer.DebugFormat("Consumer {0} Rolled Back, no dispatched Messages",
@@ -1062,20 +1373,24 @@ namespace Apache.NMS.ActiveMQ
 					{
 						// Allow the message to update its internal to reflect a Rollback.
 						dispatch.Message.OnMessageRollback();
+                    	// ensure we don't filter this as a duplicate
+                    	session.Connection.RollbackDuplicate(this, dispatch.Message);
 					}
 
 					if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
 					   lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
 					{
 						// We need to NACK the messages so that they get sent to the DLQ.
-						MessageAck ack = new MessageAck();
-
-						ack.AckType = (byte) AckType.PoisonAck;
-						ack.ConsumerId = this.info.ConsumerId;
-						ack.Destination = lastMd.Destination;
-						ack.LastMessageId = lastMd.Message.MessageId;
-						ack.MessageCount = this.dispatchedMessages.Count;
-						ack.FirstMessageId = firstMsgId;
+                    	MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, dispatchedMessages.Count);
+                    	
+						if (lastMd.RollbackCause != null)
+						{
+							BrokerError cause = new BrokerError();
+							cause.ExceptionClass = "javax.jms.JMSException";
+							cause.Message = lastMd.RollbackCause.Message;
+							ack.PoisonCause = cause;
+						}
+                    	ack.FirstMessageId = firstMsgId;
 
 						this.session.SendAck(ack);
 
@@ -1083,21 +1398,16 @@ namespace Apache.NMS.ActiveMQ
 						additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
 
 						this.redeliveryDelay = 0;
+                    	this.deliveredCounter -= this.dispatchedMessages.Count;
+                    	this.dispatchedMessages.Clear();
 					}
 					else
 					{
 						// We only send a RedeliveryAck after the first redelivery
 						if(currentRedeliveryCount > 0)
 						{
-							MessageAck ack = new MessageAck();
-
-							ack.AckType = (byte) AckType.RedeliveredAck;
-							ack.ConsumerId = this.info.ConsumerId;
-							ack.Destination = lastMd.Destination;
-							ack.LastMessageId = lastMd.Message.MessageId;
-							ack.MessageCount = this.dispatchedMessages.Count;
+                        	MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, dispatchedMessages.Count);
 							ack.FirstMessageId = firstMsgId;
-
 							this.session.SendAck(ack);
 						}
 
@@ -1115,6 +1425,9 @@ namespace Apache.NMS.ActiveMQ
                             this.unconsumedMessages.EnqueueFirst(dispatch);
 						}
 
+						this.deliveredCounter -= this.dispatchedMessages.Count;
+						this.dispatchedMessages.Clear();
+
 						if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
 						{
 							DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
@@ -1125,9 +1438,6 @@ namespace Apache.NMS.ActiveMQ
 							Start();
 						}
 					}
-
-					this.deliveredCounter -= this.dispatchedMessages.Count;
-					this.dispatchedMessages.Clear();
 				}
 			}
 
@@ -1237,6 +1547,144 @@ namespace Apache.NMS.ActiveMQ
             return this.info.Destination.Equals(dest);
         }
 
+	    private void DoOptimizedAck(object state)
+		{
+			DeliverAcks();
+		}
+	    
+	    private void WaitForRedeliveries() 
+		{
+	        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) 
+			{
+				DateTime expiry = DateTime.Now + TimeSpan.FromMilliseconds(failoverRedeliveryWaitPeriod);
+	            int numberNotReplayed;
+	            do 
+				{
+	                numberNotReplayed = 0;
+	                lock(this.dispatchedMessages)
+					{
+	                    if (previouslyDeliveredMessages != null) 
+						{
+							foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+							{
+								if (!entry.Value)
+								{
+									numberNotReplayed++;
+								}
+							}
+	                    }
+	                }
+	                if (numberNotReplayed > 0) 
+					{
+	                    Tracer.Info("waiting for redelivery of " + numberNotReplayed + " in transaction: " +
+	                                previouslyDeliveredMessages.TransactionId +  ", to consumer :" + 
+						            this.info.ConsumerId);
+	                    Thread.Sleep((int) Math.Max(500, failoverRedeliveryWaitPeriod/4));
+	                }
+	            } 
+				while (numberNotReplayed > 0 && expiry < DateTime.Now);
+	        }
+	    }
+
+     	// called with deliveredMessages locked
+	    private void RollbackOnFailedRecoveryRedelivery() 
+		{
+	        if (previouslyDeliveredMessages != null) 
+			{
+	            // if any previously delivered messages was not re-delivered, transaction is 
+				// invalid and must rollback as messages have been dispatched else where.
+	            int numberNotReplayed = 0;
+				foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+				{
+					if (!entry.Value)
+					{
+						numberNotReplayed++;
+	                    if (Tracer.IsDebugEnabled) 
+						{
+	                        Tracer.DebugFormat("previously delivered message has not been replayed in transaction: " +
+	                            previouslyDeliveredMessages.TransactionId + " , messageId: " + entry.Key);
+	                    }
+					}
+				}
+
+	            if (numberNotReplayed > 0) 
+				{
+	                String message = "rolling back transaction (" +
+	                     previouslyDeliveredMessages.TransactionId + ") post failover recovery. " + numberNotReplayed +
+	                     " previously delivered message(s) not replayed to consumer: " + this.info.ConsumerId;
+	                Tracer.Warn(message);
+	                throw new TransactionRolledBackException(message);
+	            }
+	        }
+	    }
+
+	     // called with unconsumedMessages && dispatchedMessages locked
+	     // remove any message not re-delivered as they can't be replayed to this
+	     // consumer on rollback
+	    private void RollbackPreviouslyDeliveredAndNotRedelivered() 
+		{
+	        if (previouslyDeliveredMessages != null) 
+			{
+				foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
+				{
+	                if (!entry.Value) 
+					{
+	                    RemoveFromDeliveredMessages(entry.Key);
+	                }
+	            }
+
+	            ClearPreviouslyDelivered();
+	        }
+	    }
+
+		// Must be called with dispatchedMessages locked
+	    private void RemoveFromDeliveredMessages(MessageId key) 
+		{
+			MessageDispatch toRemove = null;
+			foreach(MessageDispatch candidate in this.dispatchedMessages)
+			{
+				if (candidate.Message.MessageId.Equals(key))
+				{
+                	session.Connection.RollbackDuplicate(this, candidate.Message);
+					toRemove = candidate;
+					break;
+				}
+			}
+
+			if (toRemove != null)
+			{
+				this.dispatchedMessages.Remove(toRemove);
+			}
+	    }
+
+	    // called with deliveredMessages locked
+	    private void ClearPreviouslyDelivered() 
+		{
+	        if (previouslyDeliveredMessages != null) 
+			{
+	            previouslyDeliveredMessages.Clear();
+	            previouslyDeliveredMessages = null;
+	        }
+	    }
+
+		#region Transaction Redelivery Tracker
+
+		class PreviouslyDeliveredMap : Dictionary<MessageId, bool>
+		{
+			private TransactionId transactionId;
+			public TransactionId TransactionId
+			{
+				get { return this.transactionId; }
+			}
+
+			public PreviouslyDeliveredMap(TransactionId transactionId) : base()
+			{
+				this.transactionId = transactionId;
+			}
+		}
+
+		#endregion
+
 		#region Nested ISyncronization Types
 
 		class MessageConsumerSynchronization : ISynchronization
@@ -1252,7 +1700,21 @@ namespace Apache.NMS.ActiveMQ
 			{
                 Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd Called for Consumer {0}.",
                                    this.consumer.ConsumerId);
-				this.consumer.Acknowledge();
+
+                if (this.consumer.TransactedIndividualAck) 
+				{
+                    this.consumer.ClearDispatchList();
+                    this.consumer.WaitForRedeliveries();
+                    lock(this.consumer.dispatchedMessages)
+					{
+                        this.consumer.RollbackOnFailedRecoveryRedelivery();
+                    }
+                } 
+				else 
+				{
+					this.consumer.Acknowledge();
+                }
+
 				this.consumer.synchronizationRegistered = false;
 			}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1470081&r1=1470080&r2=1470081&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Apr 19 22:44:28 2013
@@ -966,6 +966,11 @@ namespace Apache.NMS.ActiveMQ
 
         internal void SendAck(MessageAck ack, bool lazy)
         {
+			if(Tracer.IsDebugEnabled)
+			{
+				Tracer.Debug("Session sending Ack: " + ack);
+			}
+
             if(lazy || connection.SendAcksAsync || this.IsTransacted )
             {
                 this.connection.Oneway(ack);

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs?rev=1470081&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs Fri Apr 19 22:44:28 2013
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+	public class OptimizedAckTest : NMSTestSupport
+	{
+        private Connection connection;
+		private int counter;
+
+        [SetUp]
+        public override void SetUp()
+        {
+        	connection = (Connection) CreateConnection();
+        	connection.OptimizeAcknowledge = true;
+        	connection.OptimizeAcknowledgeTimeOut = 0;
+			connection.PrefetchPolicy.All = 100;
+
+			counter = 0;
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            if(this.connection != null)
+            {
+                this.connection.Close();
+                this.connection = null;
+            }
+            
+            base.TearDown();
+        }
+
+		[Test]
+	    public void TestOptimizedAckWithExpiredMsgs()
+	    {
+	        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+	        IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+	        IMessageConsumer consumer = session.CreateConsumer(destination);
+	        IMessageProducer producer = session.CreateProducer(destination);
+	        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+	        ITextMessage message;
+
+	        // Produce msgs that will expire quickly
+	        for (int i = 0; i < 45; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(200));
+	        }
+	        
+			// Produce msgs that don't expire
+	        for (int i=0; i < 60; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(60000));
+	        }
+
+	        Thread.Sleep(1000);  // let the batch of 45 expire.
+
+            consumer.Listener += OnMessage;
+	        connection.Start();
+
+			for (int i = 0; i < 60; ++i) 
+			{
+				if (counter == 60)
+				{
+					break;
+				}
+				Thread.Sleep(1000);
+			}
+
+			Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+
+	        // Cleanup
+	        producer.Close();
+	        consumer.Close();
+	        session.Close();
+	        connection.Close();
+	    }
+
+	    [Test]
+	    public void TestOptimizedAckWithExpiredMsgsSync()
+	    {
+	        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+	        IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+	        IMessageConsumer consumer = session.CreateConsumer(destination);
+	        IMessageProducer producer = session.CreateProducer(destination);
+	        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+	        ITextMessage message;
+
+	        // Produce msgs that will expire quickly
+	        for (int i = 0; i < 45; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(200));
+	        }
+	        
+			// Produce msgs that don't expire
+	        for (int i=0; i < 60; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(60000));
+	        }
+
+			Thread.Sleep(1000);
+			connection.Start();
+
+	        int counter = 0;
+	        for (; counter < 60; ++counter) 
+			{
+	            Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+	        }
+
+			Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+			Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(2000)));
+
+	        // Cleanup
+	        producer.Close();
+	        consumer.Close();
+	        session.Close();
+	        connection.Close();
+	    }
+
+	    [Test]
+	    public void testOptimizedAckWithExpiredMsgsSync2()
+	    {
+	        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+	        IDestination destination = session.GetQueue("TestOptimizedAckWithExpiredMsgs");
+	        IMessageConsumer consumer = session.CreateConsumer(destination);
+	        IMessageProducer producer = session.CreateProducer(destination);
+	        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+	        ITextMessage message;
+
+	        // Produce msgs that don't expire
+	        for (int i = 0; i < 56; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(60000));
+	        }
+	        
+	        // Produce msgs that will expire quickly
+	        for (int i=0; i<44; i++) 
+		    {
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(200));
+	        }
+
+	        // Produce some moremsgs that don't expire
+	        for (int i=0; i<4; i++) 
+			{
+	            message = session.CreateTextMessage();
+	            producer.Send(message, 
+				              MsgDeliveryMode.NonPersistent, 
+				              MsgPriority.Normal, 
+				              TimeSpan.FromMilliseconds(60000));
+	        }
+
+			Thread.Sleep(1000);
+			connection.Start();
+
+	        int counter = 0;
+	        for (; counter < 60; ++counter) 
+			{
+	            Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+	        }
+
+			Assert.AreEqual(60, counter, "Failed to receive all expected messages");
+			Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(2000)));
+
+	        // Cleanup
+	        producer.Close();
+	        consumer.Close();
+	        session.Close();
+	        connection.Close();
+	    }
+
+        private void OnMessage(IMessage msg)
+        {
+            counter++;
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OptimizedAckTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native