You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/11 19:56:02 UTC

svn commit: r1617335 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs

Author: tabish
Date: Mon Aug 11 17:56:01 2014
New Revision: 1617335

URL: http://svn.apache.org/r1617335
Log:
Apply fix for potential deadlock on restore with active pull requests.
Fixes [AMQNET-AMQNET-487]. (See https://issues.apache.org/jira/browse/AMQNET-AMQNET-487)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1617335&r1=1617334&r2=1617335&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Mon Aug 11 17:56:01 2014
@@ -32,8 +32,8 @@ namespace Apache.NMS.ActiveMQ.State
     {
         private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
-        protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates = 
-			new Dictionary<ConnectionId, ConnectionState>();
+        protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates =
+            new Dictionary<ConnectionId, ConnectionState>();
 
         private bool isTrackTransactions;
         private bool isTrackTransactionProducers = true;
@@ -60,10 +60,10 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConnectionState cs;
 
-				if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
-					cs.RemoveTransactionState(info.TransactionId);
-				}
+                if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
+                    cs.RemoveTransactionState(info.TransactionId);
+                }
             }
         }
 
@@ -148,8 +148,8 @@ namespace Apache.NMS.ActiveMQ.State
                         {
                             if (Tracer.IsDebugEnabled)
                             {
-                                Tracer.Debug("rolling back potentially completed tx: " + 
-								             transactionState.Id);
+                                Tracer.Debug("rolling back potentially completed tx: " +
+                                             transactionState.Id);
                             }
                             toRollback.Add(transactionInfo);
                             continue;
@@ -167,7 +167,7 @@ namespace Apache.NMS.ActiveMQ.State
                     }
                     transport.Oneway(producerState.Info);
                 }
-    
+
                 foreach (Command command in transactionState.Commands)
                 {
                     if (Tracer.IsDebugEnabled)
@@ -176,7 +176,7 @@ namespace Apache.NMS.ActiveMQ.State
                     }
                     transport.Oneway(command);
                 }
-    
+
                 foreach (ProducerState producerState in transactionState.ProducerStates)
                 {
                     if (Tracer.IsDebugEnabled)
@@ -239,15 +239,15 @@ namespace Apache.NMS.ActiveMQ.State
             // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
             // recovery completes.
 
-			ConnectionState connectionState = null;
-			bool connectionInterruptionProcessingComplete = false;
+            ConnectionState connectionState = null;
+            bool connectionInterruptionProcessingComplete = false;
 
-			if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
-			{
-				connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
-			}
-			
-			// Restore the session's consumers
+            if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
+            {
+                connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
+            }
+
+            // Restore the session's consumers
             foreach(ConsumerState consumerState in sessionState.ConsumerStates)
             {
                 ConsumerInfo infoToSend = consumerState.Info;
@@ -255,13 +255,13 @@ namespace Apache.NMS.ActiveMQ.State
                 if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-					lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
-					{
-						if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
-						{
-							connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
-						}
-					}
+                    lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+                    {
+                        if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+                        {
+                            connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+                        }
+                    }
                     infoToSend.PrefetchSize = 0;
                     if(Tracer.IsDebugEnabled)
                     {
@@ -316,10 +316,10 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConnectionState cs;
 
-				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
-					cs.AddTempDestination(info);
-				}
+                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
+                    cs.AddTempDestination(info);
+                }
             }
             return TRACKED_RESPONSE_MARKER;
         }
@@ -329,8 +329,8 @@ namespace Apache.NMS.ActiveMQ.State
             if(info != null && info.Destination.IsTemporary)
             {
                 ConnectionState cs;
-				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
+                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
                     cs.RemoveTempDestination(info.Destination);
                 }
             }
@@ -348,8 +348,8 @@ namespace Apache.NMS.ActiveMQ.State
                     if(connectionId != null)
                     {
                         ConnectionState cs;
-						
-						if(connectionStates.TryGetValue(connectionId, out cs))
+
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -373,9 +373,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-						ConnectionState cs = null;
-						
-						if(connectionStates.TryGetValue(connectionId, out cs))
+                        ConnectionState cs = null;
+
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -399,9 +399,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-						ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-						if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -425,9 +425,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-						ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-						if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -435,7 +435,7 @@ namespace Apache.NMS.ActiveMQ.State
                                 ss.RemoveConsumer(id);
                             }
 
-							cs.RecoveringPullConsumers.Remove(id);
+                            cs.RecoveringPullConsumers.Remove(id);
                         }
                     }
                 }
@@ -450,9 +450,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.SessionId.ParentId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.AddSession(info);
                     }
@@ -468,9 +468,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = id.ParentId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.RemoveSession(id);
                     }
@@ -483,16 +483,16 @@ namespace Apache.NMS.ActiveMQ.State
         {
             if(info != null)
             {
-				ConnectionState connState = new ConnectionState(info);
+                ConnectionState connState = new ConnectionState(info);
 
-				if(connectionStates.ContainsKey(info.ConnectionId))
-				{
-					connectionStates[info.ConnectionId] = connState;
-				}
-				else
-				{
-					connectionStates.Add(info.ConnectionId, connState);
-				}
+                if(connectionStates.ContainsKey(info.ConnectionId))
+                {
+                    connectionStates[info.ConnectionId] = connState;
+                }
+                else
+                {
+                    connectionStates.Add(info.ConnectionId, connState);
+                }
             }
 
             return TRACKED_RESPONSE_MARKER;
@@ -517,9 +517,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = producerId.ParentId.ParentId;
                     if(connectionId != null)
                     {
-						ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-						if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             TransactionState transactionState = cs[send.TransactionId];
                             if(transactionState != null)
@@ -552,9 +552,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[ack.TransactionId];
                         if(transactionState != null)
@@ -575,9 +575,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.AddTransactionState(info.TransactionId);
                         TransactionState state = cs[info.TransactionId];
@@ -596,9 +596,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -619,9 +619,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -642,9 +642,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -665,9 +665,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -688,9 +688,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-					ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-					if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = cs[info.TransactionId];
                         if(transactionState != null)
@@ -760,57 +760,61 @@ namespace Apache.NMS.ActiveMQ.State
         public int MaxCacheSize
         {
             get { return maxCacheSize; }
-            set 
-			{ 
-				this.maxCacheSize = value; 
-				this.messageCache.MaxCacheSize = maxCacheSize;
-			}
+            set
+            {
+                this.maxCacheSize = value;
+                this.messageCache.MaxCacheSize = maxCacheSize;
+            }
         }
 
         public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
         {
-			ConnectionState connectionState = null;
+            ConnectionState connectionState = null;
 
-			if(connectionStates.TryGetValue(connectionId, out connectionState))
+            if(connectionStates.TryGetValue(connectionId, out connectionState))
             {
                 connectionState.ConnectionInterruptProcessingComplete = true;
 
-				lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
-				{
-					foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
-					{
-						ConsumerControl control = new ConsumerControl();
-						control.ConsumerId = entry.Key;
-						control.Prefetch = entry.Value.PrefetchSize;
-						control.Destination = entry.Value.Destination;
-						try
-						{
-							if(Tracer.IsDebugEnabled)
-							{
-								Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
-											 " with: " + control.Prefetch);
-							}
-							transport.Oneway(control);
-						}
-						catch(Exception ex)
-						{
-							if(Tracer.IsDebugEnabled)
-							{
-								Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
-											 " with: " + control.Prefetch + "Error: " + ex.Message);
-							}
-						}
-					}
-					connectionState.RecoveringPullConsumers.Clear();
-				}
+                Dictionary<ConsumerId, ConsumerInfo> consumersToRestorePrefetchOn;
+
+                lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+                {
+                    consumersToRestorePrefetchOn = new Dictionary<ConsumerId, ConsumerInfo>(connectionState.RecoveringPullConsumers);
+                    connectionState.RecoveringPullConsumers.Clear();
+                }
+
+                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in consumersToRestorePrefetchOn)
+                {
+                    ConsumerControl control = new ConsumerControl();
+                    control.ConsumerId = entry.Key;
+                    control.Prefetch = entry.Value.PrefetchSize;
+                    control.Destination = entry.Value.Destination;
+                    try
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+                                         " with: " + control.Prefetch);
+                        }
+                        transport.Oneway(control);
+                    }
+                    catch(Exception ex)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+                                         " with: " + control.Prefetch + "Error: " + ex.Message);
+                        }
+                    }
+                }
             }
         }
 
         public void TransportInterrupted(ConnectionId id)
         {
-			ConnectionState connection = null;
+            ConnectionState connection = null;
 
-			if(connectionStates.TryGetValue(id, out connection))
+            if(connectionStates.TryGetValue(id, out connection))
             {
                 connection.ConnectionInterruptProcessingComplete = false;
             }