You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/23 22:53:51 UTC

svn commit: r1471140 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Author: tabish
Date: Tue Apr 23 20:53:51 2013
New Revision: 1471140

URL: http://svn.apache.org/r1471140
Log:
fixes for:
https://issues.apache.org/jira/browse/AMQNET-431
https://issues.apache.org/jira/browse/AMQNET-430
https://issues.apache.org/jira/browse/AMQNET-429
https://issues.apache.org/jira/browse/AMQNET-428
https://issues.apache.org/jira/browse/AMQNET-329

Allow acks to be sent lazily in cases where the Ack mode doesn't require them to be strict. 

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1471140&r1=1471139&r2=1471140&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Tue Apr 23 20:53:51 2013
@@ -71,6 +71,7 @@ namespace Apache.NMS.ActiveMQ
 	    private bool nonBlockingRedelivery = false;
 
         private Exception failureError;
+		private ThreadPoolExecutor executor;
 
 		private event MessageListener listener;
 
@@ -500,6 +501,12 @@ namespace Apache.NMS.ActiveMQ
 					}
 				}
 
+	            if (this.executor != null) 
+				{
+					this.executor.Shutdown();
+					this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+					this.executor = null;
+	            }
 				if (this.optimizedAckTimer != null)
 				{
 					this.OptimizedAckScheduledAckInterval = 0;
@@ -660,7 +667,12 @@ namespace Apache.NMS.ActiveMQ
 
 				if(ack != null)
 				{
-					this.session.SendAck(ack);
+	                if (this.executor == null) 
+					{
+						this.executor = new ThreadPoolExecutor();
+	                }
+
+					this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
 				}
 				else
 				{
@@ -669,6 +681,24 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+		private void AsyncDeliverAck(object ack)
+		{
+			MessageAck pending = ack as MessageAck;
+			try
+			{
+				this.session.SendAck(pending, true);
+			}
+			catch
+			{
+				Tracer.ErrorFormat("Consumer {0} Failed to deliver async Ack {1}",
+                                   this.info.ConsumerId, pending);
+			}
+			finally
+			{
+				this.deliveringAcks.Value = false;
+			}
+		}
+
 		internal void InProgressClearRequired()
 		{
 			inProgressClearRequiredFlag = true;