You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2008/10/24 23:10:23 UTC

svn commit: r707747 [4/4] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/State/ src/main/csharp/Threads/ src/main/csharp/Transport/ src/main/csharp/Transport/Failover/ src/main/cs...

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	/// <summary>
+	/// Manages the thread pool for long running tasks. Long running tasks are not
+	/// always active but when they are active, they may need a few iterations of
+	/// processing for them to become idle. The manager ensures that each task is
+	/// processes but that no one task overtakes the system. This is kina like
+	/// cooperative multitasking.
+ 	/// </summary>
+	public class TaskRunnerFactory
+	{
+
+		private int maxIterationsPerRun;
+		private String name;
+		private ThreadPriority priority;
+		private bool daemon;
+
+		public TaskRunnerFactory()
+		{
+			initTaskRunnerFactory("ActiveMQ Task", ThreadPriority.Normal, true, 1000, false);
+		}
+
+		public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun)
+		{
+			initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, false);
+		}
+
+		public TaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+		{
+			initTaskRunnerFactory(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner);
+		}
+
+		public void initTaskRunnerFactory(String name, ThreadPriority priority, bool daemon, int maxIterationsPerRun, bool dedicatedTaskRunner)
+		{
+
+			this.name = name;
+			this.priority = priority;
+			this.daemon = daemon;
+			this.maxIterationsPerRun = maxIterationsPerRun;
+
+			// If your OS/JVM combination has a good thread model, you may want to avoid
+			// using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+		}
+
+		public void shutdown()
+		{
+		}
+
+		public TaskRunner CreateTaskRunner(Task task, String name)
+		{
+			return new PooledTaskRunner(task, maxIterationsPerRun);
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+	class BackupTransport
+	{
+		private FailoverTransport failoverTransport;
+		private ITransport transport;
+		private Uri uri;
+		private bool disposed;
+
+		public BackupTransport(FailoverTransport ft)
+		{
+			this.failoverTransport = ft;
+		}
+
+		public void onCommand(ITransport t, Command c)
+		{
+		}
+
+		public void onException(ITransport t, Exception error)
+		{
+			this.disposed = true;
+			if(failoverTransport != null)
+			{
+				this.failoverTransport.Reconnect();
+			}
+		}
+
+		public ITransport Transport
+		{
+			get
+			{
+				return transport;
+			}
+			set
+			{
+				transport = value;
+			}
+		}
+
+		public Uri Uri
+		{
+			get
+			{
+				return uri;
+			}
+			set
+			{
+				uri = value;
+			}
+		}
+
+		public bool Disposed
+		{
+			get
+			{
+				return disposed || (transport != null && transport.IsDisposed);
+			}
+			set
+			{
+				disposed = value;
+			}
+		}
+
+		public int hashCode()
+		{
+			return uri != null ? uri.GetHashCode() : -1;
+		}
+
+		public bool equals(Object obj)
+		{
+			if(obj is BackupTransport)
+			{
+				BackupTransport other = obj as BackupTransport;
+				return uri == null && other.uri == null ||
+					(uri != null && other.uri != null && uri.Equals(other.uri));
+			}
+			return false;
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,1142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+	/// <summary>
+	/// A Transport that is made reliable by being able to fail over to another
+	/// transport when a transport failure is detected.
+	/// </summary>
+	public class FailoverTransport : ICompositeTransport, IComparable
+	{
+
+		private static int _idCounter = 0;
+		private int _id;
+
+		private bool disposed;
+		private bool connected;
+		private List<Uri> uris = new List<Uri>();
+		private CommandHandler _commandHandler;
+		private ExceptionHandler _exceptionHandler;
+
+		private Mutex reconnectMutex = new Mutex();
+		private Mutex backupMutex = new Mutex();
+		private Mutex sleepMutex = new Mutex();
+		private Mutex listenerMutex = new Mutex();
+		private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+		private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+		private Uri connectedTransportURI;
+		private Uri failedConnectTransportURI;
+		private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+		private TaskRunner reconnectTask = null;
+		private bool started;
+
+		private int _initialReconnectDelay = 10;
+		private int _maxReconnectDelay = 1000 * 30;
+		private int _backOffMultiplier = 2;
+		private bool _useExponentialBackOff = true;
+		private bool _randomize = true;
+		private bool initialized;
+		private int _maxReconnectAttempts;
+		private int connectFailures;
+		private int _reconnectDelay = 10;
+		private Exception connectionFailure;
+		private bool firstConnection = true;
+		//optionally always have a backup created
+		private bool _backup = false;
+		private List<BackupTransport> backups = new List<BackupTransport>();
+		private int _backupPoolSize = 1;
+		private bool _trackMessages = false;
+		private int _maxCacheSize = 256;
+		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+
+		public TimeSpan RequestTimeout
+		{
+			get
+			{
+				return requestTimeout;
+			}
+			set
+			{
+				requestTimeout = value;
+			}
+		}
+
+		private class FailoverTask : Task
+		{
+			private FailoverTransport parent;
+
+			public FailoverTask(FailoverTransport p)
+			{
+				parent = p;
+			}
+
+			public bool iterate()
+			{
+				bool result = false;
+				bool buildBackup = true;
+				bool doReconnect = !parent.disposed;
+				try
+				{
+					parent.backupMutex.WaitOne();
+					if(parent.ConnectedTransport == null && !parent.disposed)
+					{
+						result = parent.doReconnect();
+						buildBackup = false;
+					}
+				}
+				finally
+				{
+					parent.backupMutex.ReleaseMutex();
+				}
+				if(buildBackup)
+				{
+					parent.buildBackups();
+				}
+				else
+				{
+					//build backups on the next iteration
+					result = true;
+					try
+					{
+						parent.reconnectTask.wakeup();
+					}
+					catch(ThreadInterruptedException e)
+					{
+						e.GetType();
+						Tracer.Debug("Reconnect task has been interrupted.");
+					}
+				}
+				return result;
+			}
+		}
+
+		public FailoverTransport()
+		{
+			_id = _idCounter++;
+
+			stateTracker.TrackTransactions = true;
+		}
+
+		~FailoverTransport()
+		{
+			Dispose(false);
+		}
+
+		public void onCommand(ITransport sender, Command command)
+		{
+			if(command != null)
+			{
+				if(command.IsResponse)
+				{
+					Object oo = null;
+					lock(requestMap)
+					{
+						int v = ((Response) command).CorrelationId;
+						try
+						{
+							oo = requestMap[v];
+							requestMap.Remove(v);
+						}
+						catch
+						{
+						}
+					}
+					Tracked t = oo as Tracked;
+					if(t != null)
+					{
+						t.onResponses();
+					}
+				}
+				if(!initialized)
+				{
+					if(command.IsBrokerInfo)
+					{
+						BrokerInfo info = (BrokerInfo) command;
+						BrokerInfo[] peers = info.PeerBrokerInfos;
+						if(peers != null)
+						{
+							for(int i = 0; i < peers.Length; i++)
+							{
+								String brokerString = peers[i].BrokerURL;
+								add(brokerString);
+							}
+						}
+						initialized = true;
+					}
+				}
+			}
+			this.Command(sender, command);
+		}
+
+		public void onException(ITransport sender, Exception error)
+		{
+			try
+			{
+				handleTransportFailure(error);
+			}
+			catch(Exception e)
+			{
+				e.GetType();
+				// What to do here?
+			}
+		}
+
+		public void disposedOnCommand(ITransport sender, Command c)
+		{
+		}
+
+		public void disposedOnException(ITransport sender, Exception e)
+		{
+		}
+
+		public void handleTransportFailure(Exception e)
+		{
+			ITransport transport = connectedTransport.GetAndSet(null);
+			if(transport != null)
+			{
+				transport.Command = new CommandHandler(disposedOnCommand);
+				transport.Exception = new ExceptionHandler(disposedOnException);
+				try
+				{
+					transport.Stop();
+				}
+				catch(Exception ex)
+				{
+					ex.GetType();	// Ignore errors but this lets us see the error during debugging
+				}
+
+				try
+				{
+					reconnectMutex.WaitOne();
+					bool reconnectOk = false;
+					if(started)
+					{
+						Tracer.Warn("Transport failed to " + ConnectedTransportURI + " , attempting to automatically reconnect due to: " + e.Message);
+						Tracer.Debug("Transport failed with the following exception:" + e.Message);
+						reconnectOk = true;
+					}
+
+					initialized = false;
+					failedConnectTransportURI = ConnectedTransportURI;
+					ConnectedTransportURI = null;
+					connected = false;
+					if(reconnectOk)
+					{
+						reconnectTask.wakeup();
+					}
+				}
+				finally
+				{
+					reconnectMutex.ReleaseMutex();
+				}
+			}
+		}
+
+		public void Start()
+		{
+			try
+			{
+				reconnectMutex.WaitOne();
+				Tracer.Debug("Started.");
+				if(started)
+				{
+					return;
+				}
+				started = true;
+				stateTracker.MaxCacheSize = MaxCacheSize;
+				stateTracker.TrackMessages = TrackMessages;
+				if(ConnectedTransport != null)
+				{
+					stateTracker.DoRestore(ConnectedTransport);
+				}
+				else
+				{
+					Reconnect();
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+		}
+
+		public virtual void Stop()
+		{
+			ITransport transportToStop = null;
+			try
+			{
+				reconnectMutex.WaitOne();
+				Tracer.Debug("Stopped.");
+				if(!started)
+				{
+					return;
+				}
+				started = false;
+				disposed = true;
+				connected = false;
+				foreach(BackupTransport t in backups)
+				{
+					t.Disposed = true;
+				}
+				backups.Clear();
+
+				if(ConnectedTransport != null)
+				{
+					transportToStop = connectedTransport.GetAndSet(null);
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+			try
+			{
+				sleepMutex.WaitOne();
+			}
+			finally
+			{
+				sleepMutex.ReleaseMutex();
+			}
+			reconnectTask.shutdown();
+			if(transportToStop != null)
+			{
+				transportToStop.Stop();
+			}
+		}
+
+		public int InitialReconnectDelay
+		{
+			get
+			{
+				return _initialReconnectDelay;
+			}
+			set
+			{
+				_initialReconnectDelay = value;
+			}
+		}
+
+		public int MaxReconnectDelay
+		{
+			get
+			{
+				return _maxReconnectDelay;
+			}
+			set
+			{
+				_maxReconnectDelay = value;
+			}
+		}
+
+		public int ReconnectDelay
+		{
+			get
+			{
+				return _reconnectDelay;
+			}
+			set
+			{
+				_reconnectDelay = value;
+			}
+		}
+
+		public int ReconnectDelayExponent
+		{
+			get
+			{
+				return _backOffMultiplier;
+			}
+			set
+			{
+				_backOffMultiplier = value;
+			}
+		}
+
+		public ITransport ConnectedTransport
+		{
+			get
+			{
+				return connectedTransport.Value;
+			}
+			set
+			{
+				connectedTransport.Value = value;
+			}
+		}
+
+		public Uri ConnectedTransportURI
+		{
+			get
+			{
+				return connectedTransportURI;
+			}
+			set
+			{
+				connectedTransportURI = value;
+			}
+		}
+
+		public int MaxReconnectAttempts
+		{
+			get
+			{
+				return _maxReconnectAttempts;
+			}
+			set
+			{
+				_maxReconnectAttempts = value;
+			}
+		}
+
+		public bool Randomize
+		{
+			get
+			{
+				return _randomize;
+			}
+			set
+			{
+				_randomize = value;
+			}
+		}
+
+		public bool Backup
+		{
+			get
+			{
+				return _backup;
+			}
+			set
+			{
+				_backup = value;
+			}
+		}
+
+		public int BackupPoolSize
+		{
+			get
+			{
+				return _backupPoolSize;
+			}
+			set
+			{
+				_backupPoolSize = value;
+			}
+		}
+
+		public bool TrackMessages
+		{
+			get
+			{
+				return _trackMessages;
+			}
+			set
+			{
+				_trackMessages = value;
+			}
+		}
+
+		public int MaxCacheSize
+		{
+			get
+			{
+				return _maxCacheSize;
+			}
+			set
+			{
+				_maxCacheSize = value;
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="command"></param>
+		/// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+		private bool IsShutdownCommand(Command command)
+		{
+			return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+		}
+
+		public void Oneway(Command command)
+		{
+			Exception error = null;
+			try
+			{
+
+				try
+				{
+					reconnectMutex.WaitOne();
+
+					if(IsShutdownCommand(command) && ConnectedTransport == null)
+					{
+						if(command.IsShutdownInfo)
+						{
+							// Skipping send of ShutdownInfo command when not connected.
+							return;
+						}
+						if(command is RemoveInfo)
+						{
+							// Simulate response to RemoveInfo command
+							Response response = new Response();
+							response.CorrelationId = command.CommandId;
+							onCommand(this, response);
+							return;
+						}
+					}
+					// Keep trying until the message is sent.
+					for(int i = 0; !disposed; i++)
+					{
+						try
+						{
+							// Wait for transport to be connected.
+							ITransport transport = ConnectedTransport;
+							while(transport == null && !disposed
+								&& connectionFailure == null
+								// && !Thread.CurrentThread.isInterrupted()
+								)
+							{
+								Tracer.Info("Waiting for transport to reconnect.");
+								try
+								{
+									// Release so that the reconnect task can run
+									reconnectMutex.ReleaseMutex();
+									try
+									{
+										// Wait for something
+										Thread.Sleep(1000);
+									}
+									catch(ThreadInterruptedException e)
+									{
+										Thread.CurrentThread.Interrupt();		// KILROY - not needed
+										Tracer.Debug("Interupted: " + e);
+									}
+								}
+								finally
+								{
+									reconnectMutex.WaitOne();
+								}
+								transport = ConnectedTransport;
+							}
+
+							if(transport == null)
+							{
+								// Previous loop may have exited due to use being
+								// disposed.
+								if(disposed)
+								{
+									error = new IOException("Transport disposed.");
+								}
+								else if(connectionFailure != null)
+								{
+									error = connectionFailure;
+								}
+								else
+								{
+									error = new IOException("Unexpected failure.");
+								}
+								break;
+							}
+
+							// If it was a request and it was not being tracked by
+							// the state tracker,
+							// then hold it in the requestMap so that we can replay
+							// it later.
+							Tracked tracked = stateTracker.track(command);
+							lock(requestMap)
+							{
+								if(tracked != null && tracked.WaitingForResponse)
+								{
+									requestMap.Add(command.CommandId, tracked);
+								}
+								else if(tracked == null && command.ResponseRequired)
+								{
+									requestMap.Add(command.CommandId, command);
+								}
+							}
+
+							// Send the message.
+							try
+							{
+								transport.Oneway(command);
+								stateTracker.trackBack(command);
+							}
+							catch(Exception e)
+							{
+
+								// If the command was not tracked.. we will retry in
+								// this method
+								if(tracked == null)
+								{
+
+									// since we will retry in this method.. take it
+									// out of the request
+									// map so that it is not sent 2 times on
+									// recovery
+									if(command.ResponseRequired)
+									{
+										requestMap.Remove(command.CommandId);
+									}
+
+									// Rethrow the exception so it will handled by
+									// the outer catch
+									throw e;
+								}
+
+							}
+
+							return;
+
+						}
+						catch(Exception e)
+						{
+							Tracer.Debug("Send Oneway attempt: " + i + " failed.");
+							handleTransportFailure(e);
+						}
+					}
+				}
+				finally
+				{
+					reconnectMutex.ReleaseMutex();
+				}
+			}
+			catch(ThreadInterruptedException e)
+			{
+				e.GetType();
+				// Some one may be trying to stop our thread.
+				Thread.CurrentThread.Interrupt();
+				throw new ThreadInterruptedException();
+			}
+			if(!disposed)
+			{
+				if(error != null)
+				{
+					throw error;
+				}
+			}
+		}
+
+		/*
+		public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
+		throw new AssertionError("Unsupported Method");
+		}
+		*/
+
+		public Object Request(Object command)
+		{
+			throw new ApplicationException("FailoverTransport does not support Request(Object)");
+		}
+
+		public Object Request(Object command, int timeout)
+		{
+			throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
+		}
+
+		public void add(Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					if(!uris.Contains(u[i]))
+					{
+						uris.Add(u[i]);
+					}
+				}
+			}
+			Reconnect();
+		}
+
+		public void remove(Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					uris.Remove(u[i]);
+				}
+			}
+			Reconnect();
+		}
+
+		public void add(String u)
+		{
+			try
+			{
+				Uri uri = new Uri(u);
+				lock(uris)
+				{
+					if(!uris.Contains(uri))
+					{
+						uris.Add(uri);
+					}
+				}
+
+				Reconnect();
+			}
+			catch(Exception e)
+			{
+				Tracer.Error("Failed to parse URI: " + u + " because " + e.Message);
+			}
+		}
+
+		public void Reconnect()
+		{
+			try
+			{
+				reconnectMutex.WaitOne();
+
+				if(started)
+				{
+					if(reconnectTask == null)
+					{
+						Tracer.Debug("Creating reconnect task");
+						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
+											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+					}
+
+					Tracer.Debug("Waking up reconnect task");
+					try
+					{
+						reconnectTask.wakeup();
+					}
+					catch(ThreadInterruptedException e)
+					{
+						e.GetType();	// Suppress warning
+						Thread.CurrentThread.Interrupt();
+					}
+				}
+				else
+				{
+					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+		}
+
+		private List<Uri> ConnectList
+		{
+			get
+			{
+				List<Uri> l = new List<Uri>(uris);
+				bool removed = false;
+				if(failedConnectTransportURI != null)
+				{
+					removed = l.Remove(failedConnectTransportURI);
+				}
+				if(Randomize)
+				{
+					// Randomly, reorder the list by random swapping
+					Random r = new Random(DateTime.Now.Millisecond);
+					for(int i = 0; i < l.Count; i++)
+					{
+						int p = r.Next(l.Count);
+						Uri t = l[p];
+						l[p] = l[i];
+						l[i] = t;
+					}
+				}
+				if(removed)
+				{
+					l.Add(failedConnectTransportURI);
+				}
+				return l;
+			}
+		}
+
+		protected void restoreTransport(ITransport t)
+		{
+			t.Start();
+			//send information to the broker - informing it we are an ft client
+			ConnectionControl cc = new ConnectionControl();
+			cc.FaultTolerant = true;
+			t.Oneway(cc);
+			stateTracker.DoRestore(t);
+			Dictionary<int, Command> tmpMap = null;
+			lock(requestMap)
+			{
+				tmpMap = new Dictionary<int, Command>(requestMap);
+			}
+			foreach(Command command in tmpMap.Values)
+			{
+				t.Oneway(command);
+			}
+		}
+
+		public bool UseExponentialBackOff
+		{
+			get
+			{
+				return _useExponentialBackOff;
+			}
+			set
+			{
+				_useExponentialBackOff = value;
+			}
+		}
+
+		public override String ToString()
+		{
+			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+		}
+
+		public String RemoteAddress
+		{
+			get
+			{
+				FailoverTransport transport = ConnectedTransport as FailoverTransport;
+				if(transport != null)
+				{
+					return transport.RemoteAddress;
+				}
+				return null;
+			}
+		}
+
+		public bool IsFaultTolerant
+		{
+			get
+			{
+				return true;
+			}
+		}
+
+		bool doReconnect()
+		{
+			Exception failure = null;
+			try
+			{
+				reconnectMutex.WaitOne();
+
+				if(disposed || connectionFailure != null)
+				{
+				}
+
+				if(ConnectedTransport != null || disposed || connectionFailure != null)
+				{
+					return false;
+				}
+				else
+				{
+					List<Uri> connectList = ConnectList;
+					if(connectList.Count == 0)
+					{
+						failure = new IOException("No uris available to connect to.");
+					}
+					else
+					{
+						if(!UseExponentialBackOff)
+						{
+							ReconnectDelay = InitialReconnectDelay;
+						}
+						try
+						{
+							backupMutex.WaitOne();
+							if(Backup && backups.Count != 0)
+							{
+								BackupTransport bt = backups[0];
+								backups.RemoveAt(0);
+								ITransport t = bt.Transport;
+								Uri uri = bt.Uri;
+								t.Command = new CommandHandler(onCommand);
+								t.Exception = new ExceptionHandler(onException);
+								try
+								{
+									if(started)
+									{
+										restoreTransport(t);
+									}
+									ReconnectDelay = InitialReconnectDelay;
+									failedConnectTransportURI = null;
+									ConnectedTransportURI = uri;
+									ConnectedTransport = t;
+									connectFailures = 0;
+									Tracer.Info("Successfully reconnected to backup " + uri);
+									return false;
+								}
+								catch(Exception e)
+								{
+									e.GetType();
+									Tracer.Debug("Backup transport failed");
+								}
+							}
+						}
+						finally
+						{
+							backupMutex.ReleaseMutex();
+						}
+
+						foreach(Uri uri in connectList)
+						{
+							if(ConnectedTransport != null || disposed)
+							{
+								break;
+							}
+
+							try
+							{
+								Tracer.Debug("Attempting connect to: " + uri);
+								ITransport t = TransportFactory.CompositeConnect(uri);
+								t.Command = new CommandHandler(onCommand);
+								t.Exception = new ExceptionHandler(onException);
+								t.Start();
+
+								if(started)
+								{
+									restoreTransport(t);
+								}
+
+								Tracer.Debug("Connection established");
+								ReconnectDelay = InitialReconnectDelay;
+								ConnectedTransportURI = uri;
+								ConnectedTransport = t;
+								connectFailures = 0;
+
+								if(firstConnection)
+								{
+									firstConnection = false;
+									Tracer.Info("Successfully connected to " + uri);
+								}
+								else
+								{
+									Tracer.Info("Successfully reconnected to " + uri);
+								}
+								connected = true;
+								return false;
+							}
+							catch(Exception e)
+							{
+								failure = e;
+								Tracer.Debug("Connect fail to: " + uri + ", reason: " + e);
+							}
+						}
+					}
+				}
+
+				if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+				{
+					Tracer.Error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
+					connectionFailure = failure;
+
+					onException(this, connectionFailure);
+
+					return false;
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+			if(!disposed)
+			{
+
+				Tracer.Debug("Waiting " + ReconnectDelay + " ms before attempting connection. ");
+				try
+				{
+					sleepMutex.WaitOne();
+					try
+					{
+						Thread.Sleep(ReconnectDelay);
+					}
+					catch(ThreadInterruptedException e)
+					{
+						e.GetType();
+						Thread.CurrentThread.Interrupt();
+					}
+				}
+				finally
+				{
+					sleepMutex.ReleaseMutex();
+				}
+
+				if(UseExponentialBackOff)
+				{
+					// Exponential increment of reconnect delay.
+					ReconnectDelay *= ReconnectDelayExponent;
+					if(ReconnectDelay > MaxReconnectDelay)
+					{
+						ReconnectDelay = MaxReconnectDelay;
+					}
+				}
+			}
+			return !disposed;
+		}
+
+
+		bool buildBackups()
+		{
+			try
+			{
+				backupMutex.WaitOne();
+				if(!disposed && Backup && backups.Count < BackupPoolSize)
+				{
+					List<Uri> connectList = ConnectList;
+					foreach(BackupTransport bt in backups)
+					{
+						if(bt.Disposed)
+						{
+							backups.Remove(bt);
+						}
+					}
+					foreach(Uri uri in connectList)
+					{
+						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+						{
+							try
+							{
+								BackupTransport bt = new BackupTransport(this);
+								bt.Uri = uri;
+								if(!backups.Contains(bt))
+								{
+									ITransport t = TransportFactory.CompositeConnect(uri);
+									t.Command = new CommandHandler(bt.onCommand);
+									t.Exception = new ExceptionHandler(bt.onException);
+									t.Start();
+									bt.Transport = t;
+									backups.Add(bt);
+								}
+							}
+							catch(Exception e)
+							{
+								e.GetType();
+								Tracer.Debug("Failed to build backup ");
+							}
+						}
+						if(backups.Count < BackupPoolSize)
+						{
+							break;
+						}
+					}
+				}
+			}
+			finally
+			{
+				backupMutex.ReleaseMutex();
+			}
+			return false;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
+		}
+
+		public bool Connected
+		{
+			get
+			{
+				return connected;
+			}
+		}
+
+		public void Reconnect(Uri uri)
+		{
+			add(new Uri[] { uri });
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+		}
+
+		public Response Request(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+		}
+
+		public Response Request(Command command, TimeSpan ts)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+		}
+
+		public CommandHandler Command
+		{
+			get
+			{
+				return _commandHandler;
+			}
+			set
+			{
+				_commandHandler = value;
+			}
+		}
+
+		public ExceptionHandler Exception
+		{
+			get
+			{
+				return _exceptionHandler;
+			}
+			set
+			{
+				_exceptionHandler = value;
+			}
+		}
+
+		public bool IsStarted
+		{
+			get
+			{
+				return started;
+			}
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		public void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+			disposed = true;
+		}
+
+		public int CompareTo(Object o)
+		{
+			if(o is FailoverTransport)
+			{
+				FailoverTransport oo = o as FailoverTransport;
+
+				return this._id - oo._id;
+			}
+			else
+			{
+				throw new ArgumentException();
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+	public class FailoverTransportFactory : ITransportFactory
+	{
+		private ITransport wrapTransport(ITransport transport)
+		{
+			transport = new MutexTransport(transport);
+			transport = new ResponseCorrelator(transport);
+			return transport;
+		}
+
+		private ITransport doConnect(Uri location)
+		{
+			ITransport transport = CreateTransport(URISupport.parseComposite(location));
+			return wrapTransport(transport);
+		}
+
+		public ITransport CompositeConnect(Uri location)
+		{
+			return CreateTransport(URISupport.parseComposite(location));
+		}
+
+		public ITransport CreateTransport(Uri location)
+		{
+			return doConnect(location);
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="compositData"></param>
+		/// <returns></returns>
+		public ITransport CreateTransport(URISupport.CompositeData compositData)
+		{
+			StringDictionary options = compositData.Parameters;
+			FailoverTransport transport = CreateTransport(options);
+			transport.add(compositData.Components);
+			return transport;
+		}
+
+		public FailoverTransport CreateTransport(StringDictionary parameters)
+		{
+			FailoverTransport transport = new FailoverTransport();
+			URISupport.SetProperties(transport, parameters, "");
+			return transport;
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+	public interface ICompositeTransport : ITransport
+	{
+		void add(Uri[] uris);
+		void remove(Uri[] uris);
+	}
+}
+

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS;
+
 using System;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -26,11 +26,11 @@
 	/// <summary>
 	/// Represents the logical networking transport layer.
 	/// </summary>
-	public interface ITransport : IStartable, IDisposable
-    {
-        void Oneway(Command command);
-        
-        FutureResponse AsyncRequest(Command command);
+	public interface ITransport : IStartable, IDisposable, IStoppable
+	{
+		void Oneway(Command command);
+
+		FutureResponse AsyncRequest(Command command);
 
 		TimeSpan RequestTimeout
 		{
@@ -38,20 +38,25 @@
 			set;
 		}
 
-        Response Request(Command command);
+		Response Request(Command command);
 		Response Request(Command command, TimeSpan timeout);
-        
-        CommandHandler Command
+
+		CommandHandler Command
+		{
+			get;
+			set;
+		}
+
+		ExceptionHandler Exception
 		{
-            get;
-            set;
-        }
-		
-        ExceptionHandler Exception
+			get;
+			set;
+		}
+
+		bool IsDisposed
 		{
-            get;
-            set;
-        }
-    }
+			get;
+		}
+	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -20,7 +20,8 @@
 namespace Apache.NMS.ActiveMQ.Transport
 {
 	public interface ITransportFactory
-    {
+	{
 		ITransport CreateTransport(Uri location);
+		ITransport CompositeConnect(Uri location);
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Fri Oct 24 14:10:22 2008
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS.Util;
+
 using System;
 using System.IO;
 using System.Net.Sockets;
 using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.OpenWire;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
@@ -37,6 +37,7 @@
 		private BinaryWriter socketWriter;
 		private Thread readThread;
 		private bool started;
+		private bool disposed = false;
 		private AtomicBoolean closed = new AtomicBoolean(false);
 		private volatile bool seenShutdown;
 		private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
@@ -63,15 +64,15 @@
 		{
 			lock(myLock)
 			{
-				if (!started)
+				if(!started)
 				{
-					if (null == commandHandler)
+					if(null == commandHandler)
 					{
 						throw new InvalidOperationException(
 								"command cannot be null when Start is called.");
 					}
 
-					if (null == exceptionHandler)
+					if(null == exceptionHandler)
 					{
 						throw new InvalidOperationException(
 								"exception cannot be null when Start is called.");
@@ -179,6 +180,11 @@
 			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
 		}
 
+		public void Stop()
+		{
+			Close();
+		}
+
 		public void Close()
 		{
 			if(closed.CompareAndSet(false, true))
@@ -235,9 +241,9 @@
 					{
 						if(Thread.CurrentThread != readThread
 #if !NETCF
-							&& readThread.IsAlive
+ && readThread.IsAlive
 #endif
-							)
+)
 						{
 							TimeSpan waitTime;
 
@@ -273,6 +279,15 @@
 		protected void Dispose(bool disposing)
 		{
 			Close();
+			disposed = true;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
 		}
 
 		public void ReadLoop()
@@ -337,7 +352,7 @@
 			set { this.commandHandler = value; }
 		}
 
-		public  ExceptionHandler Exception
+		public ExceptionHandler Exception
 		{
 			get { return exceptionHandler; }
 			set { this.exceptionHandler = value; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -16,14 +16,12 @@
  */
 
 using System;
+using System.Collections.Specialized;
 using System.Net;
 using System.Net.Sockets;
-using System.Collections.Specialized;
 using Apache.NMS.ActiveMQ.OpenWire;
 using Apache.NMS.ActiveMQ.Transport.Stomp;
-using Apache.NMS;
 using Apache.NMS.Util;
-using System.Threading;
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
@@ -103,7 +101,7 @@
 
 		#region ITransportFactory Members
 
-		public ITransport CreateTransport(Uri location)
+		public ITransport CompositeConnect(Uri location)
 		{
 			// Extract query parameters from broker Uri
 			StringDictionary map = URISupport.ParseQuery(location.Query);
@@ -136,6 +134,15 @@
 				transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
 			}
 
+			transport.RequestTimeout = this.requestTimeout;
+
+			return transport;
+		}
+
+		public ITransport CreateTransport(Uri location)
+		{
+			ITransport transport = CompositeConnect(location);
+
 			transport = new MutexTransport(transport);
 			transport = new ResponseCorrelator(transport);
 			transport.RequestTimeout = this.requestTimeout;

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+	public class TransportFactory
+	{
+
+		private static Dictionary<String, ITransportFactory> TRANSPORT_FACTORYS = new Dictionary<String, ITransportFactory>();
+
+		static TransportFactory()
+		{
+			TRANSPORT_FACTORYS.Add("tcp", new TcpTransportFactory());
+			TRANSPORT_FACTORYS.Add("failover", new FailoverTransportFactory());
+		}
+
+		/// <summary>
+		/// Creates a normal transport. 
+		/// </summary>
+		/// <param name="location"></param>
+		/// <returns>the transport</returns>
+		public static ITransport CreateTransport(Uri location)
+		{
+			ITransportFactory tf = findTransportFactory(location);
+			return tf.CreateTransport(location);
+		}
+
+		public static ITransport CompositeConnect(Uri location)
+		{
+			ITransportFactory tf = findTransportFactory(location);
+			return tf.CompositeConnect(location);
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="location"></param>
+		/// <returns></returns>
+		private static ITransportFactory findTransportFactory(Uri location)
+		{
+			String scheme = location.Scheme;
+			if(scheme == null)
+			{
+				throw new IOException("Transport not scheme specified: [" + location + "]");
+			}
+			ITransportFactory tf = TRANSPORT_FACTORYS[scheme];
+			if(tf == null)
+			{
+				throw new ApplicationException("Transport Factory for " + scheme + " does not exist.");
+			}
+			return tf;
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Fri Oct 24 14:10:22 2008
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
+
 using System;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -28,6 +28,7 @@
 		protected readonly ITransport next;
 		protected CommandHandler commandHandler;
 		protected ExceptionHandler exceptionHandler;
+		private bool disposed = false;
 
 		public TransportFilter(ITransport next)
 		{
@@ -107,12 +108,12 @@
 		{
 			if(commandHandler == null)
 			{
-				throw new InvalidOperationException ("command cannot be null when Start is called.");
+				throw new InvalidOperationException("command cannot be null when Start is called.");
 			}
 
 			if(exceptionHandler == null)
 			{
-				throw new InvalidOperationException ("exception cannot be null when Start is called.");
+				throw new InvalidOperationException("exception cannot be null when Start is called.");
 			}
 
 			this.next.Start();
@@ -141,6 +142,15 @@
 			{
 				this.next.Dispose();
 			}
+			disposed = true;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
 		}
 
 		public CommandHandler Command
@@ -149,12 +159,15 @@
 			set { this.commandHandler = value; }
 		}
 
-		public  ExceptionHandler Exception
+		public ExceptionHandler Exception
 		{
 			get { return exceptionHandler; }
 			set { this.exceptionHandler = value; }
 		}
 
+		public virtual void Stop()
+		{
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj Fri Oct 24 14:10:22 2008
@@ -237,6 +237,7 @@
     <Compile Include="src\main\csharp\Commands\PartialCommand.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Commands\ProducerAck.cs" />
     <Compile Include="src\main\csharp\Commands\ProducerId.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -303,6 +304,7 @@
       <SubType>Code</SubType>
     </Compile>
     <Compile Include="src\main\csharp\DispatchingThread.cs" />
+    <Compile Include="src\main\csharp\IOException.cs" />
     <Compile Include="src\main\csharp\ISynchronization.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -700,12 +702,32 @@
     <Compile Include="src\main\csharp\Session.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\State\CommandVisitorAdapter.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionState.cs" />
+    <Compile Include="src\main\csharp\State\ConnectionStateTracker.cs" />
+    <Compile Include="src\main\csharp\State\ConsumerState.cs" />
+    <Compile Include="src\main\csharp\State\ICommandVisitor.cs" />
+    <Compile Include="src\main\csharp\State\ProducerState.cs" />
+    <Compile Include="src\main\csharp\State\SessionState.cs" />
+    <Compile Include="src\main\csharp\State\SynchronizedObjects.cs" />
+    <Compile Include="src\main\csharp\State\ThreadSimulator.cs" />
+    <Compile Include="src\main\csharp\State\Tracked.cs" />
+    <Compile Include="src\main\csharp\State\TransactionState.cs" />
+    <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+    <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\Task.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
     <Compile Include="src\main\csharp\TransactionContext.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\Failover\BackupTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Failover\FailoverTransport.cs" />
+    <Compile Include="src\main\csharp\Transport\Failover\FailoverTransportFactory.cs" />
     <Compile Include="src\main\csharp\Transport\FutureResponse.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\ICompositeTransport.cs" />
     <Compile Include="src\main\csharp\Transport\ITransport.cs">
       <SubType>Code</SubType>
     </Compile>
@@ -731,6 +753,7 @@
     <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\Transport\TransportFactory.cs" />
     <Compile Include="src\main\csharp\Transport\TransportFilter.cs">
       <SubType>Code</SubType>
     </Compile>