You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/23 22:53:51 UTC
svn commit: r1471140 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Author: tabish
Date: Tue Apr 23 20:53:51 2013
New Revision: 1471140
URL: http://svn.apache.org/r1471140
Log:
fixes for:
https://issues.apache.org/jira/browse/AMQNET-431
https://issues.apache.org/jira/browse/AMQNET-430
https://issues.apache.org/jira/browse/AMQNET-429
https://issues.apache.org/jira/browse/AMQNET-428
https://issues.apache.org/jira/browse/AMQNET-329
Allow acks to be sent lazily in cases where the Ack mode doesn't require them to be strict.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1471140&r1=1471139&r2=1471140&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Tue Apr 23 20:53:51 2013
@@ -71,6 +71,7 @@ namespace Apache.NMS.ActiveMQ
private bool nonBlockingRedelivery = false;
private Exception failureError;
+ private ThreadPoolExecutor executor;
private event MessageListener listener;
@@ -500,6 +501,12 @@ namespace Apache.NMS.ActiveMQ
}
}
+ if (this.executor != null)
+ {
+ this.executor.Shutdown();
+ this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+ this.executor = null;
+ }
if (this.optimizedAckTimer != null)
{
this.OptimizedAckScheduledAckInterval = 0;
@@ -660,7 +667,12 @@ namespace Apache.NMS.ActiveMQ
if(ack != null)
{
- this.session.SendAck(ack);
+ if (this.executor == null)
+ {
+ this.executor = new ThreadPoolExecutor();
+ }
+
+ this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
}
else
{
@@ -669,6 +681,24 @@ namespace Apache.NMS.ActiveMQ
}
}
+ private void AsyncDeliverAck(object ack)
+ {
+ MessageAck pending = ack as MessageAck;
+ try
+ {
+ this.session.SendAck(pending, true);
+ }
+ catch
+ {
+ Tracer.ErrorFormat("Consumer {0} Failed to deliver async Ack {1}",
+ this.info.ConsumerId, pending);
+ }
+ finally
+ {
+ this.deliveringAcks.Value = false;
+ }
+ }
+
internal void InProgressClearRequired()
{
inProgressClearRequiredFlag = true;