You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2011/10/20 01:29:35 UTC

svn commit: r1186568 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State: ConnectionState.cs ConnectionStateTracker.cs

Author: jgomes
Date: Wed Oct 19 23:29:35 2011
New Revision: 1186568

URL: http://svn.apache.org/viewvc?rev=1186568&view=rev
Log:
Add locking around iterating over recoveringPullConsumers.  Add check for pre-existing consumer ID to avoid exception throw while recovering.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs?rev=1186568&r1=1186567&r2=1186568&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs Wed Oct 19 23:29:35 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.ActiveMQ.State
 		private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
 		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
 	    private bool connectionInterruptProcessingComplete = true;
-		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = 
-			new Dictionary<ConsumerId, ConsumerInfo>();
+		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new Dictionary<ConsumerId, ConsumerInfo>();
 
 		public ConnectionState(ConnectionInfo info)
 		{
@@ -179,7 +178,7 @@ namespace Apache.NMS.ActiveMQ.State
 				throw new ApplicationException("Disposed");
 			}
 		}
-		
+
 		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
 		{
 			get { return this.recoveringPullConsumers; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1186568&r1=1186567&r2=1186568&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs Wed Oct 19 23:29:35 2011
@@ -19,6 +19,7 @@ using System;
 using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
+using System.Collections;
 
 namespace Apache.NMS.ActiveMQ.State
 {
@@ -198,7 +199,13 @@ namespace Apache.NMS.ActiveMQ.State
                 if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-					connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+					lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+					{
+						if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+						{
+							connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+						}
+					}
                     infoToSend.PrefetchSize = 0;
                     if(Tracer.IsDebugEnabled)
                     {
@@ -710,32 +717,34 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 connectionState.ConnectionInterruptProcessingComplete = true;
 
-                Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
-                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
-                {
-                    ConsumerControl control = new ConsumerControl();
-                    control.ConsumerId = entry.Key;
-                    control.Prefetch = entry.Value.PrefetchSize;
-                    control.Destination = entry.Value.Destination;
-                    try
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
-                                         " with: " + control.Prefetch);
-                        }
-                        transport.Oneway(control);
-                    }
-                    catch(Exception ex)
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
-                                         " with: " + control.Prefetch + "Error: " + ex.Message);
-                        }
-                    }
-                }
-                stalledConsumers.Clear();
+				lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+				{
+					foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
+					{
+						ConsumerControl control = new ConsumerControl();
+						control.ConsumerId = entry.Key;
+						control.Prefetch = entry.Value.PrefetchSize;
+						control.Destination = entry.Value.Destination;
+						try
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch);
+							}
+							transport.Oneway(control);
+						}
+						catch(Exception ex)
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch + "Error: " + ex.Message);
+							}
+						}
+					}
+					connectionState.RecoveringPullConsumers.Clear();
+				}
             }
         }