You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/22 17:09:26 UTC
svn commit: r1051965 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Author: tabish
Date: Wed Dec 22 16:09:26 2010
New Revision: 1051965
URL: http://svn.apache.org/viewvc?rev=1051965&view=rev
Log:
Part of fix for: https://issues.apache.org/jira/browse/AMQNET-298
Additional stale MessageAck filtering for: https://issues.apache.org/jira/browse/AMQNET-294
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1051965&r1=1051964&r2=1051965&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Wed Dec 22 16:09:26 2010
@@ -543,7 +543,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
{
// Simulate response to RemoveInfo command or a MessageAck
// since it would be stale at this point.
- OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ if(command.ResponseRequired)
+ {
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ }
return;
}
}
@@ -553,6 +556,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
{
try
{
+ // Any Ack that was being sent when the connection dropped is now
+ // stale so we don't send it here as it would cause an unmatched ack
+ // on the broker side and probably prevent a consumer from getting
+ // any new messages.
+ if(command.IsMessageAck && i > 0)
+ {
+ Tracer.Debug("Inflight MessageAck being dropped as stale.");
+ if(command.ResponseRequired)
+ {
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ }
+ return;
+ }
+
// Wait for transport to be connected.
ITransport transport = ConnectedTransport;
DateTime start = DateTime.Now;
@@ -829,7 +846,14 @@ namespace Apache.NMS.ActiveMQ.Transport.
foreach(Command command in tmpMap.Values)
{
- t.Oneway(command);
+ if(command.IsMessageAck)
+ {
+ Tracer.Debug("Stored MessageAck being dropped as stale.");
+ OnCommand(this, new Response() { CorrelationId = command.CommandId });
+ continue;
+ }
+
+ t.Oneway(command);
}
}
@@ -1320,6 +1344,8 @@ namespace Apache.NMS.ActiveMQ.Transport.
// get rid of unmanaged stuff
}
+ this.Stop();
+
disposed = true;
}