You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/04/20 17:04:13 UTC

svn commit: r766723 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Author: tabish
Date: Mon Apr 20 15:04:13 2009
New Revision: 766723

URL: http://svn.apache.org/viewvc?rev=766723&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-158

Fix output of exception log statement.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=766723&r1=766722&r2=766723&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Mon Apr 20 15:04:13 2009
@@ -30,1011 +30,1011 @@
 
 namespace Apache.NMS.ActiveMQ.Transport.Failover
 {
-	/// <summary>
-	/// A Transport that is made reliable by being able to fail over to another
-	/// transport when a transport failure is detected.
-	/// </summary>
-	public class FailoverTransport : ICompositeTransport, IComparable
-	{
-
-		private static int _idCounter = 0;
-		private int _id;
-
-		private bool disposed;
-		private bool connected;
-		private List<Uri> uris = new List<Uri>();
-		private CommandHandler _commandHandler;
-		private ExceptionHandler _exceptionHandler;
-
-		private Mutex reconnectMutex = new Mutex();
-		private Mutex backupMutex = new Mutex();
-		private Mutex sleepMutex = new Mutex();
-		private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
-		private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
-
-		private Uri connectedTransportURI;
-		private Uri failedConnectTransportURI;
-		private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
-		private TaskRunner reconnectTask = null;
-		private bool started;
-
-		private int _initialReconnectDelay = 10;
-		private int _maxReconnectDelay = 1000 * 30;
-		private int _backOffMultiplier = 2;
-		private bool _useExponentialBackOff = true;
-		private bool _randomize = true;
-		private bool initialized;
-		private int _maxReconnectAttempts;
-		private int connectFailures;
-		private int _reconnectDelay = 10;
-		private Exception connectionFailure;
-		private bool firstConnection = true;
-		//optionally always have a backup created
-		private bool _backup = false;
-		private List<BackupTransport> backups = new List<BackupTransport>();
-		private int _backupPoolSize = 1;
-		private bool _trackMessages = false;
-		private int _maxCacheSize = 256;
-		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-
-		public TimeSpan RequestTimeout
-		{
-			get { return requestTimeout; }
-			set { requestTimeout = value; }
-		}
-
-		private class FailoverTask : Task
-		{
-			private FailoverTransport parent;
-
-			public FailoverTask(FailoverTransport p)
-			{
-				parent = p;
-			}
-
-			public bool iterate()
-			{
-				bool result = false;
-				bool buildBackup = true;
-				bool doReconnect = !parent.disposed;
-				try
-				{
-					parent.backupMutex.WaitOne();
-					if(parent.ConnectedTransport == null && doReconnect)
-					{
-						result = parent.doReconnect();
-						buildBackup = false;
-					}
-				}
-				finally
-				{
-					parent.backupMutex.ReleaseMutex();
-				}
-
-				if(buildBackup)
-				{
-					parent.buildBackups();
-				}
-				else
-				{
-					//build backups on the next iteration
-					result = true;
-					try
-					{
-						parent.reconnectTask.wakeup();
-					}
-					catch(ThreadInterruptedException)
-					{
-						Tracer.Debug("Reconnect task has been interrupted.");
-					}
-				}
-				return result;
-			}
-		}
-
-		public FailoverTransport()
-		{
-			_id = _idCounter++;
-
-			stateTracker.TrackTransactions = true;
-		}
-
-		~FailoverTransport()
-		{
-			Dispose(false);
-		}
-
-		public void onCommand(ITransport sender, Command command)
-		{
-			if(command != null)
-			{
-				if(command.IsResponse)
-				{
-					Object oo = null;
-					lock(requestMap)
-					{
-						int v = ((Response) command).CorrelationId;
-						try
-						{
-							oo = requestMap[v];
-							requestMap.Remove(v);
-						}
-						catch
-						{
-						}
-					}
-
-					Tracked t = oo as Tracked;
-					if(t != null)
-					{
-						t.onResponses();
-					}
-				}
-
-				if(!initialized)
-				{
-					if(command.IsBrokerInfo)
-					{
-						BrokerInfo info = (BrokerInfo) command;
-						BrokerInfo[] peers = info.PeerBrokerInfos;
-						if(peers != null)
-						{
-							for(int i = 0; i < peers.Length; i++)
-							{
-								String brokerString = peers[i].BrokerURL;
-								add(brokerString);
-							}
-						}
-
-						initialized = true;
-					}
-				}
-			}
-
-			this.Command(sender, command);
-		}
-
-		public void onException(ITransport sender, Exception error)
-		{
-			try
-			{
-				handleTransportFailure(error);
-			}
-			catch(Exception e)
-			{
-				e.GetType();
-				// What to do here?
-			}
-		}
-
-		public void disposedOnCommand(ITransport sender, Command c)
-		{
-		}
-
-		public void disposedOnException(ITransport sender, Exception e)
-		{
-		}
-
-		public void handleTransportFailure(Exception e)
-		{
-			ITransport transport = connectedTransport.GetAndSet(null);
-			if(transport != null)
-			{
-				transport.Command = new CommandHandler(disposedOnCommand);
-				transport.Exception = new ExceptionHandler(disposedOnException);
-				try
-				{
-					transport.Stop();
-				}
-				catch(Exception ex)
-				{
-					ex.GetType();	// Ignore errors but this lets us see the error during debugging
-				}
-
-				try
-				{
-					reconnectMutex.WaitOne();
-					bool reconnectOk = false;
-					if(started)
-					{
-						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI, e.Message);
-						reconnectOk = true;
-					}
-
-					initialized = false;
-					failedConnectTransportURI = ConnectedTransportURI;
-					ConnectedTransportURI = null;
-					connected = false;
-					if(reconnectOk)
-					{
-						reconnectTask.wakeup();
-					}
-				}
-				finally
-				{
-					reconnectMutex.ReleaseMutex();
-				}
-			}
-		}
-
-		public void Start()
-		{
-			try
-			{
-				reconnectMutex.WaitOne();
-				Tracer.Debug("Started.");
-				if(started)
-				{
-					return;
-				}
-				started = true;
-				stateTracker.MaxCacheSize = MaxCacheSize;
-				stateTracker.TrackMessages = TrackMessages;
-				if(ConnectedTransport != null)
-				{
-					stateTracker.DoRestore(ConnectedTransport);
-				}
-				else
-				{
-					Reconnect();
-				}
-			}
-			finally
-			{
-				reconnectMutex.ReleaseMutex();
-			}
-		}
-
-		public virtual void Stop()
-		{
-			ITransport transportToStop = null;
-			try
-			{
-				reconnectMutex.WaitOne();
-				Tracer.Debug("Stopped.");
-				if(!started)
-				{
-					return;
-				}
-
-				started = false;
-				disposed = true;
-				connected = false;
-				foreach(BackupTransport t in backups)
-				{
-					t.Disposed = true;
-				}
-				backups.Clear();
-
-				if(ConnectedTransport != null)
-				{
-					transportToStop = connectedTransport.GetAndSet(null);
-				}
-			}
-			finally
-			{
-				reconnectMutex.ReleaseMutex();
-			}
-
-			try
-			{
-				sleepMutex.WaitOne();
-			}
-			finally
-			{
-				sleepMutex.ReleaseMutex();
-			}
-
-			reconnectTask.shutdown();
-			if(transportToStop != null)
-			{
-				transportToStop.Stop();
-			}
-		}
-
-		public int InitialReconnectDelay
-		{
-			get { return _initialReconnectDelay; }
-			set { _initialReconnectDelay = value; }
-		}
-
-		public int MaxReconnectDelay
-		{
-			get { return _maxReconnectDelay; }
-			set { _maxReconnectDelay = value; }
-		}
-
-		public int ReconnectDelay
-		{
-			get { return _reconnectDelay; }
-			set { _reconnectDelay = value; }
-		}
-
-		public int ReconnectDelayExponent
-		{
-			get { return _backOffMultiplier; }
-			set { _backOffMultiplier = value; }
-		}
-
-		public ITransport ConnectedTransport
-		{
-			get { return connectedTransport.Value; }
-			set { connectedTransport.Value = value; }
-		}
-
-		public Uri ConnectedTransportURI
-		{
-			get { return connectedTransportURI; }
-			set { connectedTransportURI = value; }
-		}
-
-		public int MaxReconnectAttempts
-		{
-			get { return _maxReconnectAttempts; }
-			set { _maxReconnectAttempts = value; }
-		}
-
-		public bool Randomize
-		{
-			get { return _randomize; }
-			set { _randomize = value; }
-		}
-
-		public bool Backup
-		{
-			get { return _backup; }
-			set { _backup = value; }
-		}
-
-		public int BackupPoolSize
-		{
-			get { return _backupPoolSize; }
-			set { _backupPoolSize = value; }
-		}
-
-		public bool TrackMessages
-		{
-			get { return _trackMessages; }
-			set { _trackMessages = value; }
-		}
-
-		public int MaxCacheSize
-		{
-			get { return _maxCacheSize; }
-			set { _maxCacheSize = value; }
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="command"></param>
-		/// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
-		private bool IsShutdownCommand(Command command)
-		{
-			return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
-		}
-
-		public void Oneway(Command command)
-		{
-			Exception error = null;
-			try
-			{
-				reconnectMutex.WaitOne();
-
-				if(IsShutdownCommand(command) && ConnectedTransport == null)
-				{
-					if(command.IsShutdownInfo)
-					{
-						// Skipping send of ShutdownInfo command when not connected.
-						return;
-					}
-
-					if(command is RemoveInfo)
-					{
-						// Simulate response to RemoveInfo command
-						Response response = new Response();
-						response.CorrelationId = command.CommandId;
-						onCommand(this, response);
-						return;
-					}
-				}
-				// Keep trying until the message is sent.
-				for(int i = 0; !disposed; i++)
-				{
-					try
-					{
-						// Wait for transport to be connected.
-						ITransport transport = ConnectedTransport;
-						while(transport == null && !disposed
-							&& connectionFailure == null
-							// && !Thread.CurrentThread.isInterrupted()
-							)
-						{
-							Tracer.Info("Waiting for transport to reconnect.");
-							try
-							{
-								// Release so that the reconnect task can run
-								reconnectMutex.ReleaseMutex();
-								try
-								{
-									// Wait for something
-									Thread.Sleep(1000);
-								}
-								catch(ThreadInterruptedException e)
-								{
-									Tracer.DebugFormat("Interupted: {0}", e.Message);
-								}
-							}
-							finally
-							{
-								reconnectMutex.WaitOne();
-							}
-
-							transport = ConnectedTransport;
-						}
-
-						if(transport == null)
-						{
-							// Previous loop may have exited due to use being
-							// disposed.
-							if(disposed)
-							{
-								error = new IOException("Transport disposed.");
-							}
-							else if(connectionFailure != null)
-							{
-								error = connectionFailure;
-							}
-							else
-							{
-								error = new IOException("Unexpected failure.");
-							}
-							break;
-						}
-
-						// If it was a request and it was not being tracked by
-						// the state tracker,
-						// then hold it in the requestMap so that we can replay
-						// it later.
-						Tracked tracked = stateTracker.track(command);
-						lock(requestMap)
-						{
-							if(tracked != null && tracked.WaitingForResponse)
-							{
-								requestMap.Add(command.CommandId, tracked);
-							}
-							else if(tracked == null && command.ResponseRequired)
-							{
-								requestMap.Add(command.CommandId, command);
-							}
-						}
-
-						// Send the message.
-						try
-						{
-							transport.Oneway(command);
-							stateTracker.trackBack(command);
-						}
-						catch(Exception e)
-						{
-
-							// If the command was not tracked.. we will retry in
-							// this method
-							if(tracked == null)
-							{
-
-								// since we will retry in this method.. take it
-								// out of the request
-								// map so that it is not sent 2 times on
-								// recovery
-								if(command.ResponseRequired)
-								{
-									requestMap.Remove(command.CommandId);
-								}
-
-								// Rethrow the exception so it will handled by
-								// the outer catch
-								throw e;
-							}
-
-						}
-
-						return;
-
-					}
-					catch(Exception e)
-					{
-						Tracer.DebugFormat("Send Oneway attempt: {0} failed.", i);
-						handleTransportFailure(e);
-					}
-				}
-			}
-			finally
-			{
-				reconnectMutex.ReleaseMutex();
-			}
-
-			if(!disposed)
-			{
-				if(error != null)
-				{
-					throw error;
-				}
-			}
-		}
-
-		/*
-		public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
-		throw new AssertionError("Unsupported Method");
-		}
-		*/
-
-		public Object Request(Object command)
-		{
-			throw new ApplicationException("FailoverTransport does not support Request(Object)");
-		}
-
-		public Object Request(Object command, int timeout)
-		{
-			throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
-		}
-
-		public void add(Uri[] u)
-		{
-			lock(uris)
-			{
-				for(int i = 0; i < u.Length; i++)
-				{
-					if(!uris.Contains(u[i]))
-					{
-						uris.Add(u[i]);
-					}
-				}
-			}
-
-			Reconnect();
-		}
-
-		public void remove(Uri[] u)
-		{
-			lock(uris)
-			{
-				for(int i = 0; i < u.Length; i++)
-				{
-					uris.Remove(u[i]);
-				}
-			}
-
-			Reconnect();
-		}
-
-		public void add(String u)
-		{
-			try
-			{
-				Uri uri = new Uri(u);
-				lock(uris)
-				{
-					if(!uris.Contains(uri))
-					{
-						uris.Add(uri);
-					}
-				}
-
-				Reconnect();
-			}
-			catch(Exception e)
-			{
-				Tracer.ErrorFormat("Failed to parse URI: {0} because {1}", u, e.Message);
-			}
-		}
-
-		public void Reconnect()
-		{
-			try
-			{
-				reconnectMutex.WaitOne();
-
-				if(started)
-				{
-					if(reconnectTask == null)
-					{
-						Tracer.Debug("Creating reconnect task");
-						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
-											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
-					}
-
-					Tracer.Debug("Waking up reconnect task");
-					try
-					{
-						reconnectTask.wakeup();
-					}
-					catch(ThreadInterruptedException)
-					{
-					}
-				}
-				else
-				{
-					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
-				}
-			}
-			finally
-			{
-				reconnectMutex.ReleaseMutex();
-			}
-		}
-
-		private List<Uri> ConnectList
-		{
-			get
-			{
-				List<Uri> l = new List<Uri>(uris);
-				bool removed = false;
-				if(failedConnectTransportURI != null)
-				{
-					removed = l.Remove(failedConnectTransportURI);
-				}
-
-				if(Randomize)
-				{
-					// Randomly, reorder the list by random swapping
-					Random r = new Random(DateTime.Now.Millisecond);
-					for(int i = 0; i < l.Count; i++)
-					{
-						int p = r.Next(l.Count);
-						Uri t = l[p];
-						l[p] = l[i];
-						l[i] = t;
-					}
-				}
-
-				if(removed)
-				{
-					l.Add(failedConnectTransportURI);
-				}
-
-				return l;
-			}
-		}
-
-		protected void restoreTransport(ITransport t)
-		{
-			t.Start();
-			//send information to the broker - informing it we are an ft client
-			ConnectionControl cc = new ConnectionControl();
-			cc.FaultTolerant = true;
-			t.Oneway(cc);
-			stateTracker.DoRestore(t);
-			Dictionary<int, Command> tmpMap = null;
-			lock(requestMap)
-			{
-				tmpMap = new Dictionary<int, Command>(requestMap);
-			}
-
-			foreach(Command command in tmpMap.Values)
-			{
-				t.Oneway(command);
-			}
-		}
-
-		public bool UseExponentialBackOff
-		{
-			get { return _useExponentialBackOff; }
-			set { _useExponentialBackOff = value; }
-		}
-
-		public override String ToString()
-		{
-			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
-		}
-
-		public String RemoteAddress
-		{
-			get
-			{
-				FailoverTransport transport = ConnectedTransport as FailoverTransport;
-				if(transport != null)
-				{
-					return transport.RemoteAddress;
-				}
-				return null;
-			}
-		}
-
-		public bool IsFaultTolerant
-		{
-			get { return true; }
-		}
-
-		bool doReconnect()
-		{
-			Exception failure = null;
-			try
-			{
-				reconnectMutex.WaitOne();
-
-				if(disposed || connectionFailure != null)
-				{
-				}
-
-				if(ConnectedTransport != null || disposed || connectionFailure != null)
-				{
-					return false;
-				}
-				else
-				{
-					List<Uri> connectList = ConnectList;
-					if(connectList.Count == 0)
-					{
-						failure = new IOException("No uris available to connect to.");
-					}
-					else
-					{
-						if(!UseExponentialBackOff)
-						{
-							ReconnectDelay = InitialReconnectDelay;
-						}
-						try
-						{
-							backupMutex.WaitOne();
-							if(Backup && backups.Count != 0)
-							{
-								BackupTransport bt = backups[0];
-								backups.RemoveAt(0);
-								ITransport t = bt.Transport;
-								Uri uri = bt.Uri;
-								t.Command = new CommandHandler(onCommand);
-								t.Exception = new ExceptionHandler(onException);
-								try
-								{
-									if(started)
-									{
-										restoreTransport(t);
-									}
-									ReconnectDelay = InitialReconnectDelay;
-									failedConnectTransportURI = null;
-									ConnectedTransportURI = uri;
-									ConnectedTransport = t;
-									connectFailures = 0;
-									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri);
-									return false;
-								}
-								catch(Exception e)
-								{
-									e.GetType();
-									Tracer.Debug("Backup transport failed");
-								}
-							}
-						}
-						finally
-						{
-							backupMutex.ReleaseMutex();
-						}
-
-						foreach(Uri uri in connectList)
-						{
-							if(ConnectedTransport != null || disposed)
-							{
-								break;
-							}
-
-							try
-							{
-								Tracer.DebugFormat("Attempting connect to: {0}", uri);
-								ITransport t = TransportFactory.CompositeConnect(uri);
-								t.Command = new CommandHandler(onCommand);
-								t.Exception = new ExceptionHandler(onException);
-								t.Start();
-
-								if(started)
-								{
-									restoreTransport(t);
-								}
-
-								Tracer.Debug("Connection established");
-								ReconnectDelay = InitialReconnectDelay;
-								ConnectedTransportURI = uri;
-								ConnectedTransport = t;
-								connectFailures = 0;
-
-								if(firstConnection)
-								{
-									firstConnection = false;
-									Tracer.InfoFormat("Successfully connected to: {0}", uri);
-								}
-								else
-								{
-									Tracer.InfoFormat("Successfully reconnected to: {0}", uri);
-								}
-
-								connected = true;
-								return false;
-							}
-							catch(Exception e)
-							{
-								failure = e;
-								Tracer.DebugFormat("Connect fail to: (0}, reason: {1}", uri, e.Message);
-							}
-						}
-					}
-				}
-
-				if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
-				{
-					Tracer.ErrorFormat("Failed to connect to transport after: {0} attempt(s)", connectFailures);
-					connectionFailure = failure;
-					onException(this, connectionFailure);
-					return false;
-				}
-			}
-			finally
-			{
-				reconnectMutex.ReleaseMutex();
-			}
-
-			if(!disposed)
-			{
-
-				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
-				try
-				{
-					sleepMutex.WaitOne();
-					try
-					{
-						Thread.Sleep(ReconnectDelay);
-					}
-					catch(ThreadInterruptedException)
-					{
-					}
-				}
-				finally
-				{
-					sleepMutex.ReleaseMutex();
-				}
-
-				if(UseExponentialBackOff)
-				{
-					// Exponential increment of reconnect delay.
-					ReconnectDelay *= ReconnectDelayExponent;
-					if(ReconnectDelay > MaxReconnectDelay)
-					{
-						ReconnectDelay = MaxReconnectDelay;
-					}
-				}
-			}
-			return !disposed;
-		}
-
-
-		bool buildBackups()
-		{
-			try
-			{
-				backupMutex.WaitOne();
-				if(!disposed && Backup && backups.Count < BackupPoolSize)
-				{
-					List<Uri> connectList = ConnectList;
-					foreach(BackupTransport bt in backups)
-					{
-						if(bt.Disposed)
-						{
-							backups.Remove(bt);
-						}
-					}
-
-					foreach(Uri uri in connectList)
-					{
-						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
-						{
-							try
-							{
-								BackupTransport bt = new BackupTransport(this);
-								bt.Uri = uri;
-								if(!backups.Contains(bt))
-								{
-									ITransport t = TransportFactory.CompositeConnect(uri);
-									t.Command = new CommandHandler(bt.onCommand);
-									t.Exception = new ExceptionHandler(bt.onException);
-									t.Start();
-									bt.Transport = t;
-									backups.Add(bt);
-								}
-							}
-							catch(Exception e)
-							{
-								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
-							}
-						}
-
-						if(backups.Count < BackupPoolSize)
-						{
-							break;
-						}
-					}
-				}
-			}
-			finally
-			{
-				backupMutex.ReleaseMutex();
-			}
-
-			return false;
-		}
-
-		public bool IsDisposed
-		{
-			get { return disposed; }
-		}
-
-		public bool Connected
-		{
-			get { return connected; }
-		}
-
-		public void Reconnect(Uri uri)
-		{
-			add(new Uri[] { uri });
-		}
-
-		public FutureResponse AsyncRequest(Command command)
-		{
-			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
-		}
-
-		public Response Request(Command command)
-		{
-			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
-		}
-
-		public Response Request(Command command, TimeSpan ts)
-		{
-			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
-		}
-
-		public CommandHandler Command
-		{
-			get { return _commandHandler; }
-			set { _commandHandler = value; }
-		}
-
-		public ExceptionHandler Exception
-		{
-			get { return _exceptionHandler; }
-			set { _exceptionHandler = value; }
-		}
-
-		public bool IsStarted
-		{
-			get { return started; }
-		}
-
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		public void Dispose(bool disposing)
-		{
-			if(disposing)
-			{
-				// get rid of unmanaged stuff
-			}
-
-			disposed = true;
-		}
-
-		public int CompareTo(Object o)
-		{
-			if(o is FailoverTransport)
-			{
-				FailoverTransport oo = o as FailoverTransport;
-
-				return this._id - oo._id;
-			}
-			else
-			{
-				throw new ArgumentException();
-			}
-		}
-	}
+    /// <summary>
+    /// A Transport that is made reliable by being able to fail over to another
+    /// transport when a transport failure is detected.
+    /// </summary>
+    public class FailoverTransport : ICompositeTransport, IComparable
+    {
+
+        private static int _idCounter = 0;
+        private int _id;
+
+        private bool disposed;
+        private bool connected;
+        private List<Uri> uris = new List<Uri>();
+        private CommandHandler _commandHandler;
+        private ExceptionHandler _exceptionHandler;
+
+        private Mutex reconnectMutex = new Mutex();
+        private Mutex backupMutex = new Mutex();
+        private Mutex sleepMutex = new Mutex();
+        private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+        private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+        private Uri connectedTransportURI;
+        private Uri failedConnectTransportURI;
+        private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+        private TaskRunner reconnectTask = null;
+        private bool started;
+
+        private int _initialReconnectDelay = 10;
+        private int _maxReconnectDelay = 1000 * 30;
+        private int _backOffMultiplier = 2;
+        private bool _useExponentialBackOff = true;
+        private bool _randomize = true;
+        private bool initialized;
+        private int _maxReconnectAttempts;
+        private int connectFailures;
+        private int _reconnectDelay = 10;
+        private Exception connectionFailure;
+        private bool firstConnection = true;
+        //optionally always have a backup created
+        private bool _backup = false;
+        private List<BackupTransport> backups = new List<BackupTransport>();
+        private int _backupPoolSize = 1;
+        private bool _trackMessages = false;
+        private int _maxCacheSize = 256;
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+
+        public TimeSpan RequestTimeout
+        {
+            get { return requestTimeout; }
+            set { requestTimeout = value; }
+        }
+
+        private class FailoverTask : Task
+        {
+            private FailoverTransport parent;
+
+            public FailoverTask(FailoverTransport p)
+            {
+                parent = p;
+            }
+
+            public bool iterate()
+            {
+                bool result = false;
+                bool buildBackup = true;
+                bool doReconnect = !parent.disposed;
+                try
+                {
+                    parent.backupMutex.WaitOne();
+                    if(parent.ConnectedTransport == null && doReconnect)
+                    {
+                        result = parent.doReconnect();
+                        buildBackup = false;
+                    }
+                }
+                finally
+                {
+                    parent.backupMutex.ReleaseMutex();
+                }
+
+                if(buildBackup)
+                {
+                    parent.buildBackups();
+                }
+                else
+                {
+                    //build backups on the next iteration
+                    result = true;
+                    try
+                    {
+                        parent.reconnectTask.wakeup();
+                    }
+                    catch(ThreadInterruptedException)
+                    {
+                        Tracer.Debug("Reconnect task has been interrupted.");
+                    }
+                }
+                return result;
+            }
+        }
+
+        public FailoverTransport()
+        {
+            _id = _idCounter++;
+
+            stateTracker.TrackTransactions = true;
+        }
+
+        ~FailoverTransport()
+        {
+            Dispose(false);
+        }
+
+        public void onCommand(ITransport sender, Command command)
+        {
+            if(command != null)
+            {
+                if(command.IsResponse)
+                {
+                    Object oo = null;
+                    lock(requestMap)
+                    {
+                        int v = ((Response) command).CorrelationId;
+                        try
+                        {
+                            oo = requestMap[v];
+                            requestMap.Remove(v);
+                        }
+                        catch
+                        {
+                        }
+                    }
+
+                    Tracked t = oo as Tracked;
+                    if(t != null)
+                    {
+                        t.onResponses();
+                    }
+                }
+
+                if(!initialized)
+                {
+                    if(command.IsBrokerInfo)
+                    {
+                        BrokerInfo info = (BrokerInfo) command;
+                        BrokerInfo[] peers = info.PeerBrokerInfos;
+                        if(peers != null)
+                        {
+                            for(int i = 0; i < peers.Length; i++)
+                            {
+                                String brokerString = peers[i].BrokerURL;
+                                add(brokerString);
+                            }
+                        }
+
+                        initialized = true;
+                    }
+                }
+            }
+
+            this.Command(sender, command);
+        }
+
+        public void onException(ITransport sender, Exception error)
+        {
+            try
+            {
+                handleTransportFailure(error);
+            }
+            catch(Exception e)
+            {
+                e.GetType();
+                // What to do here?
+            }
+        }
+
+        public void disposedOnCommand(ITransport sender, Command c)
+        {
+        }
+
+        public void disposedOnException(ITransport sender, Exception e)
+        {
+        }
+
+        public void handleTransportFailure(Exception e)
+        {
+            ITransport transport = connectedTransport.GetAndSet(null);
+            if(transport != null)
+            {
+                transport.Command = new CommandHandler(disposedOnCommand);
+                transport.Exception = new ExceptionHandler(disposedOnException);
+                try
+                {
+                    transport.Stop();
+                }
+                catch(Exception ex)
+                {
+                    ex.GetType();	// Ignore errors but this lets us see the error during debugging
+                }
+
+                try
+                {
+                    reconnectMutex.WaitOne();
+                    bool reconnectOk = false;
+                    if(started)
+                    {
+                        Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI, e.Message);
+                        reconnectOk = true;
+                    }
+
+                    initialized = false;
+                    failedConnectTransportURI = ConnectedTransportURI;
+                    ConnectedTransportURI = null;
+                    connected = false;
+                    if(reconnectOk)
+                    {
+                        reconnectTask.wakeup();
+                    }
+                }
+                finally
+                {
+                    reconnectMutex.ReleaseMutex();
+                }
+            }
+        }
+
+        public void Start()
+        {
+            try
+            {
+                reconnectMutex.WaitOne();
+                Tracer.Debug("Started.");
+                if(started)
+                {
+                    return;
+                }
+                started = true;
+                stateTracker.MaxCacheSize = MaxCacheSize;
+                stateTracker.TrackMessages = TrackMessages;
+                if(ConnectedTransport != null)
+                {
+                    stateTracker.DoRestore(ConnectedTransport);
+                }
+                else
+                {
+                    Reconnect();
+                }
+            }
+            finally
+            {
+                reconnectMutex.ReleaseMutex();
+            }
+        }
+
+        public virtual void Stop()
+        {
+            ITransport transportToStop = null;
+            try
+            {
+                reconnectMutex.WaitOne();
+                Tracer.Debug("Stopped.");
+                if(!started)
+                {
+                    return;
+                }
+
+                started = false;
+                disposed = true;
+                connected = false;
+                foreach(BackupTransport t in backups)
+                {
+                    t.Disposed = true;
+                }
+                backups.Clear();
+
+                if(ConnectedTransport != null)
+                {
+                    transportToStop = connectedTransport.GetAndSet(null);
+                }
+            }
+            finally
+            {
+                reconnectMutex.ReleaseMutex();
+            }
+
+            try
+            {
+                sleepMutex.WaitOne();
+            }
+            finally
+            {
+                sleepMutex.ReleaseMutex();
+            }
+
+            reconnectTask.shutdown();
+            if(transportToStop != null)
+            {
+                transportToStop.Stop();
+            }
+        }
+
+        public int InitialReconnectDelay
+        {
+            get { return _initialReconnectDelay; }
+            set { _initialReconnectDelay = value; }
+        }
+
+        public int MaxReconnectDelay
+        {
+            get { return _maxReconnectDelay; }
+            set { _maxReconnectDelay = value; }
+        }
+
+        public int ReconnectDelay
+        {
+            get { return _reconnectDelay; }
+            set { _reconnectDelay = value; }
+        }
+
+        public int ReconnectDelayExponent
+        {
+            get { return _backOffMultiplier; }
+            set { _backOffMultiplier = value; }
+        }
+
+        public ITransport ConnectedTransport
+        {
+            get { return connectedTransport.Value; }
+            set { connectedTransport.Value = value; }
+        }
+
+        public Uri ConnectedTransportURI
+        {
+            get { return connectedTransportURI; }
+            set { connectedTransportURI = value; }
+        }
+
+        public int MaxReconnectAttempts
+        {
+            get { return _maxReconnectAttempts; }
+            set { _maxReconnectAttempts = value; }
+        }
+
+        public bool Randomize
+        {
+            get { return _randomize; }
+            set { _randomize = value; }
+        }
+
+        public bool Backup
+        {
+            get { return _backup; }
+            set { _backup = value; }
+        }
+
+        public int BackupPoolSize
+        {
+            get { return _backupPoolSize; }
+            set { _backupPoolSize = value; }
+        }
+
+        public bool TrackMessages
+        {
+            get { return _trackMessages; }
+            set { _trackMessages = value; }
+        }
+
+        public int MaxCacheSize
+        {
+            get { return _maxCacheSize; }
+            set { _maxCacheSize = value; }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="command"></param>
+        /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+        private bool IsShutdownCommand(Command command)
+        {
+            return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+        }
+
+        public void Oneway(Command command)
+        {
+            Exception error = null;
+            try
+            {
+                reconnectMutex.WaitOne();
+
+                if(IsShutdownCommand(command) && ConnectedTransport == null)
+                {
+                    if(command.IsShutdownInfo)
+                    {
+                        // Skipping send of ShutdownInfo command when not connected.
+                        return;
+                    }
+
+                    if(command is RemoveInfo)
+                    {
+                        // Simulate response to RemoveInfo command
+                        Response response = new Response();
+                        response.CorrelationId = command.CommandId;
+                        onCommand(this, response);
+                        return;
+                    }
+                }
+                // Keep trying until the message is sent.
+                for(int i = 0; !disposed; i++)
+                {
+                    try
+                    {
+                        // Wait for transport to be connected.
+                        ITransport transport = ConnectedTransport;
+                        while(transport == null && !disposed
+                            && connectionFailure == null
+                            // && !Thread.CurrentThread.isInterrupted()
+                            )
+                        {
+                            Tracer.Info("Waiting for transport to reconnect.");
+                            try
+                            {
+                                // Release so that the reconnect task can run
+                                reconnectMutex.ReleaseMutex();
+                                try
+                                {
+                                    // Wait for something
+                                    Thread.Sleep(1000);
+                                }
+                                catch(ThreadInterruptedException e)
+                                {
+                                    Tracer.DebugFormat("Interupted: {0}", e.Message);
+                                }
+                            }
+                            finally
+                            {
+                                reconnectMutex.WaitOne();
+                            }
+
+                            transport = ConnectedTransport;
+                        }
+
+                        if(transport == null)
+                        {
+                            // Previous loop may have exited due to use being
+                            // disposed.
+                            if(disposed)
+                            {
+                                error = new IOException("Transport disposed.");
+                            }
+                            else if(connectionFailure != null)
+                            {
+                                error = connectionFailure;
+                            }
+                            else
+                            {
+                                error = new IOException("Unexpected failure.");
+                            }
+                            break;
+                        }
+
+                        // If it was a request and it was not being tracked by
+                        // the state tracker,
+                        // then hold it in the requestMap so that we can replay
+                        // it later.
+                        Tracked tracked = stateTracker.track(command);
+                        lock(requestMap)
+                        {
+                            if(tracked != null && tracked.WaitingForResponse)
+                            {
+                                requestMap.Add(command.CommandId, tracked);
+                            }
+                            else if(tracked == null && command.ResponseRequired)
+                            {
+                                requestMap.Add(command.CommandId, command);
+                            }
+                        }
+
+                        // Send the message.
+                        try
+                        {
+                            transport.Oneway(command);
+                            stateTracker.trackBack(command);
+                        }
+                        catch(Exception e)
+                        {
+
+                            // If the command was not tracked.. we will retry in
+                            // this method
+                            if(tracked == null)
+                            {
+
+                                // since we will retry in this method.. take it
+                                // out of the request
+                                // map so that it is not sent 2 times on
+                                // recovery
+                                if(command.ResponseRequired)
+                                {
+                                    requestMap.Remove(command.CommandId);
+                                }
+
+                                // Rethrow the exception so it will handled by
+                                // the outer catch
+                                throw e;
+                            }
+
+                        }
+
+                        return;
+
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("Send Oneway attempt: {0} failed.", i);
+                        handleTransportFailure(e);
+                    }
+                }
+            }
+            finally
+            {
+                reconnectMutex.ReleaseMutex();
+            }
+
+            if(!disposed)
+            {
+                if(error != null)
+                {
+                    throw error;
+                }
+            }
+        }
+
+        /*
+        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
+        throw new AssertionError("Unsupported Method");
+        }
+        */
+
+        public Object Request(Object command)
+        {
+            throw new ApplicationException("FailoverTransport does not support Request(Object)");
+        }
+
+        public Object Request(Object command, int timeout)
+        {
+            throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
+        }
+
+        public void add(Uri[] u)
+        {
+            lock(uris)
+            {
+                for(int i = 0; i < u.Length; i++)
+                {
+                    if(!uris.Contains(u[i]))
+                    {
+                        uris.Add(u[i]);
+                    }
+                }
+            }
+
+            Reconnect();
+        }
+
+        public void remove(Uri[] u)
+        {
+            lock(uris)
+            {
+                for(int i = 0; i < u.Length; i++)
+                {
+                    uris.Remove(u[i]);
+                }
+            }
+
+            Reconnect();
+        }
+
+        public void add(String u)
+        {
+            try
+            {
+                Uri uri = new Uri(u);
+                lock(uris)
+                {
+                    if(!uris.Contains(uri))
+                    {
+                        uris.Add(uri);
+                    }
+                }
+
+                Reconnect();
+            }
+            catch(Exception e)
+            {
+                Tracer.ErrorFormat("Failed to parse URI: {0} because {1}", u, e.Message);
+            }
+        }
+
+        public void Reconnect()
+        {
+            try
+            {
+                reconnectMutex.WaitOne();
+
+                if(started)
+                {
+                    if(reconnectTask == null)
+                    {
+                        Tracer.Debug("Creating reconnect task");
+                        reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
+                                            "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+                    }
+
+                    Tracer.Debug("Waking up reconnect task");
+                    try
+                    {
+                        reconnectTask.wakeup();
+                    }
+                    catch(ThreadInterruptedException)
+                    {
+                    }
+                }
+                else
+                {
+                    Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+                }
+            }
+            finally
+            {
+                reconnectMutex.ReleaseMutex();
+            }
+        }
+
+        private List<Uri> ConnectList
+        {
+            get
+            {
+                List<Uri> l = new List<Uri>(uris);
+                bool removed = false;
+                if(failedConnectTransportURI != null)
+                {
+                    removed = l.Remove(failedConnectTransportURI);
+                }
+
+                if(Randomize)
+                {
+                    // Randomly, reorder the list by random swapping
+                    Random r = new Random(DateTime.Now.Millisecond);
+                    for(int i = 0; i < l.Count; i++)
+                    {
+                        int p = r.Next(l.Count);
+                        Uri t = l[p];
+                        l[p] = l[i];
+                        l[i] = t;
+                    }
+                }
+
+                if(removed)
+                {
+                    l.Add(failedConnectTransportURI);
+                }
+
+                return l;
+            }
+        }
+
+        protected void restoreTransport(ITransport t)
+        {
+            t.Start();
+            //send information to the broker - informing it we are an ft client
+            ConnectionControl cc = new ConnectionControl();
+            cc.FaultTolerant = true;
+            t.Oneway(cc);
+            stateTracker.DoRestore(t);
+            Dictionary<int, Command> tmpMap = null;
+            lock(requestMap)
+            {
+                tmpMap = new Dictionary<int, Command>(requestMap);
+            }
+
+            foreach(Command command in tmpMap.Values)
+            {
+                t.Oneway(command);
+            }
+        }
+
+        public bool UseExponentialBackOff
+        {
+            get { return _useExponentialBackOff; }
+            set { _useExponentialBackOff = value; }
+        }
+
+        public override String ToString()
+        {
+            return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+        }
+
+        public String RemoteAddress
+        {
+            get
+            {
+                FailoverTransport transport = ConnectedTransport as FailoverTransport;
+                if(transport != null)
+                {
+                    return transport.RemoteAddress;
+                }
+                return null;
+            }
+        }
+
+        public bool IsFaultTolerant
+        {
+            get { return true; }
+        }
+
+        bool doReconnect()
+        {
+            Exception failure = null;
+            try
+            {
+                reconnectMutex.WaitOne();
+
+                if(disposed || connectionFailure != null)
+                {
+                }
+
+                if(ConnectedTransport != null || disposed || connectionFailure != null)
+                {
+                    return false;
+                }
+                else
+                {
+                    List<Uri> connectList = ConnectList;
+                    if(connectList.Count == 0)
+                    {
+                        failure = new IOException("No uris available to connect to.");
+                    }
+                    else
+                    {
+                        if(!UseExponentialBackOff)
+                        {
+                            ReconnectDelay = InitialReconnectDelay;
+                        }
+                        try
+                        {
+                            backupMutex.WaitOne();
+                            if(Backup && backups.Count != 0)
+                            {
+                                BackupTransport bt = backups[0];
+                                backups.RemoveAt(0);
+                                ITransport t = bt.Transport;
+                                Uri uri = bt.Uri;
+                                t.Command = new CommandHandler(onCommand);
+                                t.Exception = new ExceptionHandler(onException);
+                                try
+                                {
+                                    if(started)
+                                    {
+                                        restoreTransport(t);
+                                    }
+                                    ReconnectDelay = InitialReconnectDelay;
+                                    failedConnectTransportURI = null;
+                                    ConnectedTransportURI = uri;
+                                    ConnectedTransport = t;
+                                    connectFailures = 0;
+                                    Tracer.InfoFormat("Successfully reconnected to backup {0}", uri);
+                                    return false;
+                                }
+                                catch(Exception e)
+                                {
+                                    e.GetType();
+                                    Tracer.Debug("Backup transport failed");
+                                }
+                            }
+                        }
+                        finally
+                        {
+                            backupMutex.ReleaseMutex();
+                        }
+
+                        foreach(Uri uri in connectList)
+                        {
+                            if(ConnectedTransport != null || disposed)
+                            {
+                                break;
+                            }
+
+                            try
+                            {
+                                Tracer.DebugFormat("Attempting connect to: {0}", uri);
+                                ITransport t = TransportFactory.CompositeConnect(uri);
+                                t.Command = new CommandHandler(onCommand);
+                                t.Exception = new ExceptionHandler(onException);
+                                t.Start();
+
+                                if(started)
+                                {
+                                    restoreTransport(t);
+                                }
+
+                                Tracer.Debug("Connection established");
+                                ReconnectDelay = InitialReconnectDelay;
+                                ConnectedTransportURI = uri;
+                                ConnectedTransport = t;
+                                connectFailures = 0;
+
+                                if(firstConnection)
+                                {
+                                    firstConnection = false;
+                                    Tracer.InfoFormat("Successfully connected to: {0}", uri);
+                                }
+                                else
+                                {
+                                    Tracer.InfoFormat("Successfully reconnected to: {0}", uri);
+                                }
+
+                                connected = true;
+                                return false;
+                            }
+                            catch(Exception e)
+                            {
+                                failure = e;
+                                Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+                            }
+                        }
+                    }
+                }
+
+                if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+                {
+                    Tracer.ErrorFormat("Failed to connect to transport after: {0} attempt(s)", connectFailures);
+                    connectionFailure = failure;
+                    onException(this, connectionFailure);
+                    return false;
+                }
+            }
+            finally
+            {
+                reconnectMutex.ReleaseMutex();
+            }
+
+            if(!disposed)
+            {
+
+                Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+                try
+                {
+                    sleepMutex.WaitOne();
+                    try
+                    {
+                        Thread.Sleep(ReconnectDelay);
+                    }
+                    catch(ThreadInterruptedException)
+                    {
+                    }
+                }
+                finally
+                {
+                    sleepMutex.ReleaseMutex();
+                }
+
+                if(UseExponentialBackOff)
+                {
+                    // Exponential increment of reconnect delay.
+                    ReconnectDelay *= ReconnectDelayExponent;
+                    if(ReconnectDelay > MaxReconnectDelay)
+                    {
+                        ReconnectDelay = MaxReconnectDelay;
+                    }
+                }
+            }
+            return !disposed;
+        }
+
+
+        bool buildBackups()
+        {
+            try
+            {
+                backupMutex.WaitOne();
+                if(!disposed && Backup && backups.Count < BackupPoolSize)
+                {
+                    List<Uri> connectList = ConnectList;
+                    foreach(BackupTransport bt in backups)
+                    {
+                        if(bt.Disposed)
+                        {
+                            backups.Remove(bt);
+                        }
+                    }
+
+                    foreach(Uri uri in connectList)
+                    {
+                        if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+                        {
+                            try
+                            {
+                                BackupTransport bt = new BackupTransport(this);
+                                bt.Uri = uri;
+                                if(!backups.Contains(bt))
+                                {
+                                    ITransport t = TransportFactory.CompositeConnect(uri);
+                                    t.Command = new CommandHandler(bt.onCommand);
+                                    t.Exception = new ExceptionHandler(bt.onException);
+                                    t.Start();
+                                    bt.Transport = t;
+                                    backups.Add(bt);
+                                }
+                            }
+                            catch(Exception e)
+                            {
+                                Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+                            }
+                        }
+
+                        if(backups.Count < BackupPoolSize)
+                        {
+                            break;
+                        }
+                    }
+                }
+            }
+            finally
+            {
+                backupMutex.ReleaseMutex();
+            }
+
+            return false;
+        }
+
+        public bool IsDisposed
+        {
+            get { return disposed; }
+        }
+
+        public bool Connected
+        {
+            get { return connected; }
+        }
+
+        public void Reconnect(Uri uri)
+        {
+            add(new Uri[] { uri });
+        }
+
+        public FutureResponse AsyncRequest(Command command)
+        {
+            throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+        }
+
+        public Response Request(Command command)
+        {
+            throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+        }
+
+        public Response Request(Command command, TimeSpan ts)
+        {
+            throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+        }
+
+        public CommandHandler Command
+        {
+            get { return _commandHandler; }
+            set { _commandHandler = value; }
+        }
+
+        public ExceptionHandler Exception
+        {
+            get { return _exceptionHandler; }
+            set { _exceptionHandler = value; }
+        }
+
+        public bool IsStarted
+        {
+            get { return started; }
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        public void Dispose(bool disposing)
+        {
+            if(disposing)
+            {
+                // get rid of unmanaged stuff
+            }
+
+            disposed = true;
+        }
+
+        public int CompareTo(Object o)
+        {
+            if(o is FailoverTransport)
+            {
+                FailoverTransport oo = o as FailoverTransport;
+
+                return this._id - oo._id;
+            }
+            else
+            {
+                throw new ArgumentException();
+            }
+        }
+    }
 }