You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2011/10/20 01:29:35 UTC
svn commit: r1186568 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State:
ConnectionState.cs ConnectionStateTracker.cs
Author: jgomes
Date: Wed Oct 19 23:29:35 2011
New Revision: 1186568
URL: http://svn.apache.org/viewvc?rev=1186568&view=rev
Log:
Add locking around iterating over recoveringPullConsumers. Add check for pre-existing consumer ID to avoid exception throw while recovering.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs?rev=1186568&r1=1186567&r2=1186568&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionState.cs Wed Oct 19 23:29:35 2011
@@ -31,8 +31,7 @@ namespace Apache.NMS.ActiveMQ.State
private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
private bool connectionInterruptProcessingComplete = true;
- private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers =
- new Dictionary<ConsumerId, ConsumerInfo>();
+ private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new Dictionary<ConsumerId, ConsumerInfo>();
public ConnectionState(ConnectionInfo info)
{
@@ -179,7 +178,7 @@ namespace Apache.NMS.ActiveMQ.State
throw new ApplicationException("Disposed");
}
}
-
+
public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
{
get { return this.recoveringPullConsumers; }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1186568&r1=1186567&r2=1186568&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/State/ConnectionStateTracker.cs Wed Oct 19 23:29:35 2011
@@ -19,6 +19,7 @@ using System;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Transport;
+using System.Collections;
namespace Apache.NMS.ActiveMQ.State
{
@@ -198,7 +199,13 @@ namespace Apache.NMS.ActiveMQ.State
if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
{
infoToSend = consumerState.Info.Clone() as ConsumerInfo;
- connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+ {
+ connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+ }
+ }
infoToSend.PrefetchSize = 0;
if(Tracer.IsDebugEnabled)
{
@@ -710,32 +717,34 @@ namespace Apache.NMS.ActiveMQ.State
{
connectionState.ConnectionInterruptProcessingComplete = true;
- Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
- foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
- {
- ConsumerControl control = new ConsumerControl();
- control.ConsumerId = entry.Key;
- control.Prefetch = entry.Value.PrefetchSize;
- control.Destination = entry.Value.Destination;
- try
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
- " with: " + control.Prefetch);
- }
- transport.Oneway(control);
- }
- catch(Exception ex)
- {
- if(Tracer.IsDebugEnabled)
- {
- Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
- " with: " + control.Prefetch + "Error: " + ex.Message);
- }
- }
- }
- stalledConsumers.Clear();
+ lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+ {
+ foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
+ {
+ ConsumerControl control = new ConsumerControl();
+ control.ConsumerId = entry.Key;
+ control.Prefetch = entry.Value.PrefetchSize;
+ control.Destination = entry.Value.Destination;
+ try
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch);
+ }
+ transport.Oneway(control);
+ }
+ catch(Exception ex)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+ " with: " + control.Prefetch + "Error: " + ex.Message);
+ }
+ }
+ }
+ connectionState.RecoveringPullConsumers.Clear();
+ }
}
}