You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/22 17:09:26 UTC

svn commit: r1051965 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Author: tabish
Date: Wed Dec 22 16:09:26 2010
New Revision: 1051965

URL: http://svn.apache.org/viewvc?rev=1051965&view=rev
Log:
Part of fix for: https://issues.apache.org/jira/browse/AMQNET-298
Additional stale MessageAck filtering for: https://issues.apache.org/jira/browse/AMQNET-294

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1051965&r1=1051964&r2=1051965&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Wed Dec 22 16:09:26 2010
@@ -543,7 +543,10 @@ namespace Apache.NMS.ActiveMQ.Transport.
 					{
 						// Simulate response to RemoveInfo command or a MessageAck
                         // since it would be stale at this point.
-						OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                        if(command.ResponseRequired)
+                        {
+						    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                        }
 						return;
 					}
 				}
@@ -553,6 +556,20 @@ namespace Apache.NMS.ActiveMQ.Transport.
 				{
 					try
 					{
+                        // Any Ack that was being sent when the connection dropped is now
+                        // stale so we don't send it here as it would cause an unmatched ack
+                        // on the broker side and probably prevent a consumer from getting
+                        // any new messages.
+                        if(command.IsMessageAck && i > 0)
+                        {
+                            Tracer.Debug("Inflight MessageAck being dropped as stale.");
+                            if(command.ResponseRequired)
+                            {
+                                OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                            }
+                            return;
+                        }
+
 						// Wait for transport to be connected.
 						ITransport transport = ConnectedTransport;
 						DateTime start = DateTime.Now;
@@ -829,7 +846,14 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 			foreach(Command command in tmpMap.Values)
 			{
-				t.Oneway(command);
+                if(command.IsMessageAck)
+                {
+                    Tracer.Debug("Stored MessageAck being dropped as stale.");
+                    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                    continue;
+                }
+
+                t.Oneway(command);
 			}
 		}
 
@@ -1320,6 +1344,8 @@ namespace Apache.NMS.ActiveMQ.Transport.
 				// get rid of unmanaged stuff
 			}
 
+            this.Stop();
+
 			disposed = true;
 		}