You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/08/11 19:56:02 UTC
svn commit: r1617335 -
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Author: tabish
Date: Mon Aug 11 17:56:01 2014
New Revision: 1617335
URL: http://svn.apache.org/r1617335
Log:
Apply fix for potential deadlock on restore with active pull requests.
Fixes [AMQNET-AMQNET-487]. (See https://issues.apache.org/jira/browse/AMQNET-AMQNET-487)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1617335&r1=1617334&r2=1617335&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Mon Aug 11 17:56:01 2014
@@ -32,8 +32,8 @@ namespace Apache.NMS.ActiveMQ.State
{
private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
- protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates =
- new Dictionary<ConnectionId, ConnectionState>();
+ protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates =
+ new Dictionary<ConnectionId, ConnectionState>();
private bool isTrackTransactions;
private bool isTrackTransactionProducers = true;
@@ -60,10 +60,10 @@ namespace Apache.NMS.ActiveMQ.State
{
ConnectionState cs;
- if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
- {
- cs.RemoveTransactionState(info.TransactionId);
- }
+ if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
+ {
+ cs.RemoveTransactionState(info.TransactionId);
+ }
}
}
@@ -148,8 +148,8 @@ namespace Apache.NMS.ActiveMQ.State
{
if (Tracer.IsDebugEnabled)
{
- Tracer.Debug("rolling back potentially completed tx: " +
- transactionState.Id);
+ Tracer.Debug("rolling back potentially completed tx: " +
+ transactionState.Id);
}
toRollback.Add(transactionInfo);
continue;
@@ -167,7 +167,7 @@ namespace Apache.NMS.ActiveMQ.State
}
transport.Oneway(producerState.Info);
}
-
+
foreach (Command command in transactionState.Commands)
{
if (Tracer.IsDebugEnabled)
@@ -176,7 +176,7 @@ namespace Apache.NMS.ActiveMQ.State
}
transport.Oneway(command);
}
-
+
foreach (ProducerState producerState in transactionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
@@ -239,15 +239,15 @@ namespace Apache.NMS.ActiveMQ.State
// Restore the session's consumers but possibly in pull only (prefetch 0 state) till
// recovery completes.
- ConnectionState connectionState = null;
- bool connectionInterruptionProcessingComplete = false;
+ ConnectionState connectionState = null;
+ bool connectionInterruptionProcessingComplete = false;
- if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
- {
- connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
- }
-
- // Restore the session's consumers
+ if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
+ {
+ connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
+ }
+
+ // Restore the session's consumers
foreach(ConsumerState consumerState in sessionState.ConsumerStates)
{
ConsumerInfo infoToSend = consumerState.Info;
@@ -255,13 +255,13 @@ namespace Apache.NMS.ActiveMQ.State
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
- lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
- {
- if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
- {
- connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
- }
- }
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+ {
+ connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+ }
+ }
infoToSend.PrefetchSize = 0;
if(Tracer.IsDebugEnabled)
{
@@ -316,10 +316,10 @@ namespace Apache.NMS.ActiveMQ.State
{
ConnectionState cs;
- if(connectionStates.TryGetValue(info.ConnectionId, out cs))
- {
- cs.AddTempDestination(info);
- }
+ if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+ {
+ cs.AddTempDestination(info);
+ }
}
return TRACKED_RESPONSE_MARKER;
}
@@ -329,8 +329,8 @@ namespace Apache.NMS.ActiveMQ.State
if(info != null && info.Destination.IsTemporary)
{
ConnectionState cs;
- if(connectionStates.TryGetValue(info.ConnectionId, out cs))
- {
+ if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+ {
cs.RemoveTempDestination(info.Destination);
}
}
@@ -348,8 +348,8 @@ namespace Apache.NMS.ActiveMQ.State
if(connectionId != null)
{
ConnectionState cs;
-
- if(connectionStates.TryGetValue(connectionId, out cs))
+
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
@@ -373,9 +373,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
-
- if(connectionStates.TryGetValue(connectionId, out cs))
+ ConnectionState cs = null;
+
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
@@ -399,9 +399,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
@@ -425,9 +425,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = sessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
SessionState ss = cs[sessionId];
if(ss != null)
@@ -435,7 +435,7 @@ namespace Apache.NMS.ActiveMQ.State
ss.RemoveConsumer(id);
}
- cs.RecoveringPullConsumers.Remove(id);
+ cs.RecoveringPullConsumers.Remove(id);
}
}
}
@@ -450,9 +450,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.SessionId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.AddSession(info);
}
@@ -468,9 +468,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = id.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.RemoveSession(id);
}
@@ -483,16 +483,16 @@ namespace Apache.NMS.ActiveMQ.State
{
if(info != null)
{
- ConnectionState connState = new ConnectionState(info);
+ ConnectionState connState = new ConnectionState(info);
- if(connectionStates.ContainsKey(info.ConnectionId))
- {
- connectionStates[info.ConnectionId] = connState;
- }
- else
- {
- connectionStates.Add(info.ConnectionId, connState);
- }
+ if(connectionStates.ContainsKey(info.ConnectionId))
+ {
+ connectionStates[info.ConnectionId] = connState;
+ }
+ else
+ {
+ connectionStates.Add(info.ConnectionId, connState);
+ }
}
return TRACKED_RESPONSE_MARKER;
@@ -517,9 +517,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = producerId.ParentId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[send.TransactionId];
if(transactionState != null)
@@ -552,9 +552,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[ack.TransactionId];
if(transactionState != null)
@@ -575,9 +575,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
cs.AddTransactionState(info.TransactionId);
TransactionState state = cs[info.TransactionId];
@@ -596,9 +596,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
@@ -619,9 +619,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
@@ -642,9 +642,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
@@ -665,9 +665,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
@@ -688,9 +688,9 @@ namespace Apache.NMS.ActiveMQ.State
ConnectionId connectionId = info.ConnectionId;
if(connectionId != null)
{
- ConnectionState cs = null;
+ ConnectionState cs = null;
- if(connectionStates.TryGetValue(connectionId, out cs))
+ if(connectionStates.TryGetValue(connectionId, out cs))
{
TransactionState transactionState = cs[info.TransactionId];
if(transactionState != null)
@@ -760,57 +760,61 @@ namespace Apache.NMS.ActiveMQ.State
public int MaxCacheSize
{
get { return maxCacheSize; }
- set
- {
- this.maxCacheSize = value;
- this.messageCache.MaxCacheSize = maxCacheSize;
- }
+ set
+ {
+ this.maxCacheSize = value;
+ this.messageCache.MaxCacheSize = maxCacheSize;
+ }
}
public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
{
- ConnectionState connectionState = null;
+ ConnectionState connectionState = null;
- if(connectionStates.TryGetValue(connectionId, out connectionState))
+ if(connectionStates.TryGetValue(connectionId, out connectionState))
{
connectionState.ConnectionInterruptProcessingComplete = true;
- lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
- {
- foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
- {
- ConsumerControl control = new ConsumerControl();
- control.ConsumerId = entry.Key;
- control.Prefetch = entry.Value.PrefetchSize;
- control.Destination = entry.Value.Destination;
- try
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
- " with: " + control.Prefetch);
- }
- transport.Oneway(control);
- }
- catch(Exception ex)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
- " with: " + control.Prefetch + "Error: " + ex.Message);
- }
- }
- }
- connectionState.RecoveringPullConsumers.Clear();
- }
+ Dictionary<ConsumerId, ConsumerInfo> consumersToRestorePrefetchOn;
+
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ consumersToRestorePrefetchOn = new Dictionary<ConsumerId, ConsumerInfo>(connectionState.RecoveringPullConsumers);
+ connectionState.RecoveringPullConsumers.Clear();
+ }
+
+ foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in consumersToRestorePrefetchOn)
+ {
+ ConsumerControl control = new ConsumerControl();
+ control.ConsumerId = entry.Key;
+ control.Prefetch = entry.Value.PrefetchSize;
+ control.Destination = entry.Value.Destination;
+ try
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch);
+ }
+ transport.Oneway(control);
+ }
+ catch(Exception ex)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch + "Error: " + ex.Message);
+ }
+ }
+ }
}
}
public void TransportInterrupted(ConnectionId id)
{
- ConnectionState connection = null;
+ ConnectionState connection = null;
- if(connectionStates.TryGetValue(id, out connection))
+ if(connectionStates.TryGetValue(id, out connection))
{
connection.ConnectionInterruptProcessingComplete = false;
}