You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/02/13 00:00:44 UTC

svn commit: r627169 [2/3] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/ src/main/csharp/OpenWire/V1/ src/main/csharp/OpenWire/V2/ src/main/csharp/Transport/ src/main/csharp/Transport/Stomp/ src/main/csharp/Transport/Tcp/...

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Feb 12 15:00:40 2008
@@ -16,6 +16,7 @@
  */
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Util;
 using Apache.NMS;
 using System;
 using System.Collections;
@@ -27,31 +28,32 @@
     /// </summary>
     public class Connection : IConnection
     {
-        private ITransport transport;
-        private ConnectionInfo info;
+    	private readonly Uri brokerUri;
+		private ITransport transport;
+		private readonly ConnectionInfo info;
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         private BrokerInfo brokerInfo; // from broker
         private WireFormatInfo brokerWireFormatInfo; // from broker
-        private IList sessions = new ArrayList();
-        private IDictionary consumers = new Hashtable(); // TODO threadsafe
-        private bool asyncSend;
-        private bool connected;
-        private bool closed;
-        private long sessionCounter;
-        private long temporaryDestinationCounter;
+        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+        private bool asyncSend = false;
+        private bool connected = false;
+        private bool closed = false;
+        private long sessionCounter = 0;
+        private long temporaryDestinationCounter = 0;
         private long localTransactionCounter;
-        private bool closing;
-        private Util.AtomicBoolean started = new ActiveMQ.Util.AtomicBoolean(true);
+        private bool closing = false;
+        private readonly AtomicBoolean started = new AtomicBoolean(true);
     	private bool disposed = false;
         
-        public Connection(ITransport transport, ConnectionInfo info)
+        public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
         {
-            this.transport = transport;
-            this.info = info;
-            this.transport.Command = new CommandHandler(OnCommand);
-			this.transport.Exception = new ExceptionHandler(OnException);
-            this.transport.Start();
-        }
+        	this.brokerUri = connectionUri;
+			this.info = info;
+			this.transport = transport;
+			this.transport.Command = OnCommand;
+			this.transport.Exception = OnException;
+			this.transport.Start();
+		}
 
         ~Connection()
         {
@@ -66,7 +68,8 @@
 			get { return started.Value; }
 		}
 
-		
+		#region Properties
+
 		/// <summary>
 		/// This property indicates whether or not async send is enabled.
 		/// </summary>
@@ -75,8 +78,19 @@
 			get { return asyncSend; }
 			set { asyncSend = value; }
 		}
-		
-		
+
+		/// <summary>
+		/// This property sets the acknowledgment mode for the connection.
+		/// The URI parameter connection.ackmode can be set to a string value
+		/// that maps to the enumeration value.
+		/// </summary>
+		public string AckMode
+    	{
+			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+    	}
+
+		#endregion
+
 		/// <summary>
 		/// Starts asynchronous message delivery of incoming messages for this connection.
 		/// Synchronous delivery is unaffected.
@@ -120,25 +134,41 @@
         /// <summary>
         /// Creates a new session to work on this connection
         /// </summary>
-        public ISession CreateSession(AcknowledgementMode acknowledgementMode)
+        public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
         {
-            SessionInfo info = CreateSessionInfo(acknowledgementMode);
-            SyncRequest(info);
-            Session session = new Session(this, info, acknowledgementMode);
-            sessions.Add(session);
+			SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
+			SyncRequest(info);
+			Session session = new Session(this, info, sessionAcknowledgementMode);
+
+			// Set properties on session using parameters prefixed with "session."
+			System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(this.brokerUri.Query);
+			URISupport.SetProperties(session, map, "session.");
+
+			sessions.Add(session);
             return session;
         }
 
+		public void RemoveSession(Session session)
+		{
+			DisposeOf(session.SessionId);
+
+			if(!closing)
+			{
+				sessions.Remove(session);
+			}
+		}
+
 		public void Close()
 		{
-			if (!closed)
+			if(!closed)
 			{
 				closing = true;
-				foreach (Session session in sessions)
+				foreach(Session session in sessions)
 				{
 					session.Close();
 				}
 				sessions.Clear();
+
 				try
 				{
 					DisposeOf(ConnectionId);
@@ -148,6 +178,7 @@
 				{
 					Tracer.ErrorFormat("Error during connection close: {0}", ex);
 				}
+
 				transport.Dispose();
 				transport = null;
 				closed = true;
@@ -188,23 +219,29 @@
 		}
 		
         // Properties
-        
-        public ITransport ITransport
+
+		public Uri BrokerUri
+		{
+			get { return brokerUri; }
+		}
+		
+		public ITransport ITransport
         {
             get { return transport; }
             set { this.transport = value; }
         }
 
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return acknowledgementMode; }
-            set { this.acknowledgementMode = value; }
-        }
-        
-        public string ClientId
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return acknowledgementMode; }
+			set { this.acknowledgementMode = value; }
+		}
+		
+		public string ClientId
         {
             get { return info.ClientId; }
-            set {
+            set
+			{
                 if (connected)
                 {
                     throw new NMSException("You cannot change the ClientId once the Connection is connected");
@@ -215,32 +252,26 @@
         
         public ConnectionId ConnectionId
         {
-            get {
-                return info.ConnectionId;
-            }
+            get { return info.ConnectionId; }
         }
         
         public BrokerInfo BrokerInfo
         {
-            get {
-                return brokerInfo;
-            }
+            get { return brokerInfo; }
         }
         
         public WireFormatInfo BrokerWireFormat
         {
-            get {
-                return brokerWireFormatInfo;
-            }
+            get { return brokerWireFormatInfo; }
         }
         
         // Implementation methods
-        
-        /// <summary>
+
+		/// <summary>
         /// Performs a synchronous request-response with the broker
         /// </summary>
         public Response SyncRequest(Command command)
-        {
+		{
             CheckConnected();
             Response response = transport.Request(command);
             if (response is ExceptionResponse)
@@ -251,21 +282,23 @@
             }
             return response;
         }
-        
-        public void OneWay(Command command)
-        {
-            CheckConnected();
-            transport.Oneway(command);
-        }
-        
-        public void DisposeOf(DataStructure objectId)
-        {
-            RemoveInfo command = new RemoveInfo();
-            command.ObjectId = objectId;
-            transport.Oneway(command);
-        }
-        
-        
+
+		public void OneWay(Command command)
+		{
+			CheckConnected();
+			transport.Oneway(command);
+		}
+
+		public void DisposeOf(DataStructure objectId)
+		{
+			RemoveInfo command = new RemoveInfo();
+			command.ObjectId = objectId;
+			// Ensure that the object is disposed to avoid potential race-conditions
+			// of trying to re-create the same object in the broker faster than
+			// the broker can dispose of the object.
+			SyncRequest(command);
+		}
+
         /// <summary>
         /// Creates a new temporary destination name
         /// </summary>
@@ -301,67 +334,38 @@
             {
                 connected = true;
                 // now lets send the connection and see if we get an ack/nak
-                SyncRequest(info);
-            }
-        }
-        
-        /// <summary>
-        /// Register a new consumer
-        /// </summary>
-        /// <param name="consumerId">A  ConsumerId</param>
-        /// <param name="consumer">A  MessageConsumer</param>
-        public void AddConsumer(ConsumerId consumerId, MessageConsumer consumer)
-        {
-            consumers[consumerId] = consumer;
-        }
-        
-        
-        /// <summary>
-        /// Remove a consumer
-        /// </summary>
-        /// <param name="consumerId">A  ConsumerId</param>
-        public void RemoveConsumer(ConsumerId consumerId)
-        {
-            consumers[consumerId] = null;
+                if(null == SyncRequest(info))
+                {
+                	throw new ConnectionClosedException();
+                }
+			}
         }
         
-        
-        /// <summary>
+		/// <summary>
         /// Handle incoming commands
         /// </summary>
-        /// <param name="transport">An ITransport</param>
+		/// <param name="commandTransport">An ITransport</param>
         /// <param name="command">A  Command</param>
-        protected void OnCommand(ITransport transport, Command command)
+        protected void OnCommand(ITransport commandTransport, Command command)
         {
-            if (command is MessageDispatch)
+            if(command is MessageDispatch)
             {
-                MessageDispatch dispatch = (MessageDispatch) command;
-                ConsumerId consumerId = dispatch.ConsumerId;
-                MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
-                if (consumer == null)
-                {
-					Tracer.Error("No such consumer active: " + consumerId);
-                }
-                else
-                {
-                    ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
-                    consumer.Dispatch(message);
-                }
+				DispatchMessage((MessageDispatch) command);
             }
-            else if (command is WireFormatInfo)
+            else if(command is WireFormatInfo)
             {
                 this.brokerWireFormatInfo = (WireFormatInfo) command;
             }
-            else if (command is BrokerInfo)
+            else if(command is BrokerInfo)
             {
                 this.brokerInfo = (BrokerInfo) command;
             }
-            else if (command is ShutdownInfo)
+            else if(command is ShutdownInfo)
             {
                 //ShutdownInfo info = (ShutdownInfo)command;
-                if( !closing && !closed )
+                if(!closing && !closed)
                 {
-                    OnException(transport, new NMSException("Broker closed this connection."));
+					OnException(commandTransport, new NMSException("Broker closed this connection."));
                 }
             }
             else
@@ -370,21 +374,44 @@
             }
         }
 
-        protected void OnException(ITransport sender, Exception exception)
+		protected void DispatchMessage(MessageDispatch dispatch)
+		{
+			bool dispatched = false;
+
+			foreach(Session session in sessions)
+			{
+				if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+				{
+					dispatched = true;
+					break;
+				}
+			}
+
+			if(!dispatched)
+			{
+				Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
+			}
+		}
+
+    	protected void OnException(ITransport sender, Exception exception)
         {
             Tracer.ErrorFormat("Transport Exception: {0}", exception.ToString());
-            if (ExceptionListener != null)
-                ExceptionListener(exception);
+            if(ExceptionListener != null)
+            {
+            	ExceptionListener(exception);
+            }
         }
 
 		internal void OnSessionException(Session sender, Exception exception)
 		{
 			Tracer.ErrorFormat("Session Exception: {0}", exception.ToString());
-			if (ExceptionListener != null)
+			if(ExceptionListener != null)
+			{
 				ExceptionListener(exception);
+			}
 		}
         
-        protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
+        protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
         {
             SessionInfo answer = new SessionInfo();
             SessionId sessionId = new SessionId();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Tue Feb 12 15:00:40 2008
@@ -82,25 +82,21 @@
 
     	public IConnection CreateConnection(string userName, string password)
         {
-			ConnectionInfo info = CreateConnectionInfo(userName, password);
-
-			ITransportFactory tcpTransportFactory = new TcpTransportFactory();
-
             Uri uri = brokerUri;
             // Do we need to strip off the activemq prefix??
-            if ("activemq".Equals(brokerUri.Scheme))
+            if("activemq".Equals(brokerUri.Scheme))
             {
-                uri = new Uri(brokerUri.AbsolutePath);
+                uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
             }
 
+			ConnectionInfo info = CreateConnectionInfo(userName, password);
+			ITransportFactory tcpTransportFactory = new TcpTransportFactory();
 			ITransport transport = tcpTransportFactory.CreateTransport(uri);
+			Connection connection = new Connection(uri, transport, info);
 
-			IConnection connection = new Connection(transport, info);
-			connection.ClientId = info.ClientId;
-
-			// Set properties on connection using parameters prefixed with "jms."
+			// Set properties on connection using parameters prefixed with "connection."
 			System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(brokerUri.Query);
-			URISupport.SetProperties(connection, map, "jms.");
+			URISupport.SetProperties(connection, map, "connection.");
 
 			return connection;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs Tue Feb 12 15:00:40 2008
@@ -94,8 +94,11 @@
         {
             lock (semaphore)
             {
-                queue.Enqueue(message);
-                messageReceivedEventHandle.Set();
+				if(!m_bClosed)
+				{
+					queue.Enqueue(message);
+					messageReceivedEventHandle.Set();
+				}
             }
         }
         

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Tue Feb 12 15:00:40 2008
@@ -39,16 +39,21 @@
 		private readonly ConsumerInfo info;
 		private int maximumRedeliveryCount = 10;
 		private int redeliveryTimeout = 500;
-		private readonly Session session;
+		private Session session;
+		private Session ackSession;
 		protected bool disposed = false;
 
 		// Constructor internal to prevent clients from creating an instance.
 		internal MessageConsumer(Session session, ConsumerInfo info,
-		                         AcknowledgementMode acknowledgementMode)
+								 AcknowledgementMode acknowledgementMode)
 		{
 			this.session = session;
 			this.info = info;
 			this.acknowledgementMode = acknowledgementMode;
+			if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode)
+			{
+				this.ackSession = (Session) session.Connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			}
 		}
 
 		~MessageConsumer()
@@ -94,19 +99,19 @@
 		public IMessage Receive()
 		{
 			CheckClosed();
-            return SetupAcknowledge(dispatcher.Dequeue());
+			return SetupAcknowledge(dispatcher.Dequeue());
 		}
 
 		public IMessage Receive(System.TimeSpan timeout)
 		{
 			CheckClosed();
-            return SetupAcknowledge(dispatcher.Dequeue(timeout));
+			return SetupAcknowledge(dispatcher.Dequeue(timeout));
 		}
 
 		public IMessage ReceiveNoWait()
 		{
 			CheckClosed();
-            return SetupAcknowledge(dispatcher.DequeueNoWait());
+			return SetupAcknowledge(dispatcher.DequeueNoWait());
 		}
 
 		public void Dispose()
@@ -129,7 +134,6 @@
 
 			try
 			{
-				session.Connection.DisposeOf(info.ConsumerId);
 				Close();
 			}
 			catch
@@ -152,9 +156,17 @@
 
 			// wake up any pending dequeue() call on the dispatcher
 			dispatcher.Close();
+			session.DisposeOf(info.ConsumerId);
+			session = null;
 
 			lock(this)
 			{
+				if(ackSession != null)
+				{
+					ackSession.Close();
+					ackSession = null;
+				}
+
 				closed = true;
 			}
 		}
@@ -174,6 +186,17 @@
 		/// <param name="message">An ActiveMQMessage</param>
 		public void Dispatch(ActiveMQMessage message)
 		{
+			lock(this)
+			{
+				if(ackSession != null)
+				{
+					message.Acknowledger += DoNothingAcknowledge;
+					MessageAck ack = CreateMessageAck(message);
+					Tracer.Debug("Sending AutoAck: " + ack);
+					ackSession.Connection.OneWay(ack);
+				}
+			}
+
 			dispatcher.Enqueue(message);
 		}
 
@@ -190,7 +213,7 @@
 					break;
 				}
 
-                message = SetupAcknowledge(message);
+				message = SetupAcknowledge(message);
 				// invoke listener. Exceptions caught by the dispatcher thread
 				listener(message);
 			}
@@ -209,36 +232,34 @@
 
 		protected IMessage SetupAcknowledge(IMessage message)
 		{
-            if (message == null)
-                return null;
+			if(null == message)
+			{
+				return null;
+			}
+
+			if(message is ActiveMQMessage)
+			{
+				ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+
+				if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode)
+				{
+					activeMessage.Acknowledger += DoClientAcknowledge;
+				}
+				else if(AcknowledgementMode.AutoAcknowledge != acknowledgementMode)
+				{
+					activeMessage.Acknowledger += DoNothingAcknowledge;
+					DoClientAcknowledge(activeMessage);
+				}
+			}
 
-            if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
-            {
-                if (message is ActiveMQMessage)
-                {
-                    ActiveMQMessage activeMessage = (ActiveMQMessage)message;
-                    activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
-                }
-            }
-            else
-            {
-                if (message is ActiveMQMessage)
-                {
-                    ActiveMQMessage activeMessage = (ActiveMQMessage)message;
-                    activeMessage.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-
-                    MessageAck ack = CreateMessageAck(activeMessage);
-                    Tracer.Debug("Sending Ack: " + ack);
-                    session.Connection.OneWay(ack);
-                }
-            }
 			return message;
 		}
 
-        protected void DoNothingAcknowledge(ActiveMQMessage message)
-        {
-        }
-        protected void DoClientAcknowledge(ActiveMQMessage message)
+		protected void DoNothingAcknowledge(ActiveMQMessage message)
+		{
+		}
+
+		protected void DoClientAcknowledge(ActiveMQMessage message)
 		{
 			MessageAck ack = CreateMessageAck(message);
 			Tracer.Debug("Sending Ack: " + ack);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Tue Feb 12 15:00:40 2008
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Util;
 using Apache.NMS;
 using System;
 
@@ -26,7 +25,8 @@
 	/// </summary>
 	public class MessageProducer : IMessageProducer
 	{
-		private readonly Session session;
+		private Session session;
+		private bool closed = false;
 		private readonly ProducerInfo info;
 		private long messageCounter = 0;
 
@@ -69,7 +69,7 @@
 
 			try
 			{
-				session.Connection.DisposeOf(info.ProducerId);
+				Close();
 			}
 			catch
 			{
@@ -79,6 +79,36 @@
 			disposed = true;
 		}
 
+		public void Close()
+		{
+			lock(this)
+			{
+				if(closed)
+				{
+					return;
+				}
+			}
+
+			session.DisposeOf(info.ProducerId);
+			session = null;
+
+			lock(this)
+			{
+				closed = true;
+			}
+		}
+
+		protected void CheckClosed()
+		{
+			lock(this)
+			{
+				if(closed)
+				{
+					throw new ConnectionClosedException();
+				}
+			}
+		}
+
 		public void Send(IMessage message)
 		{
 			Send(info.Destination, message);
@@ -101,7 +131,8 @@
 
 		protected void Send(IDestination destination, IMessage message, bool persistent, byte priority, TimeSpan timeToLive, bool specifiedTimeToLive)
 		{
-			ActiveMQMessage activeMessage = (ActiveMQMessage)message;
+			CheckClosed();
+			ActiveMQMessage activeMessage = (ActiveMQMessage) message;
 
 			if (!disableMessageID)
 			{
@@ -171,36 +202,43 @@
 
 		public IMessage CreateMessage()
 		{
+			CheckClosed();
 			return session.CreateMessage();
 		}
 
 		public ITextMessage CreateTextMessage()
 		{
+			CheckClosed();
 			return session.CreateTextMessage();
 		}
 
 		public ITextMessage CreateTextMessage(string text)
 		{
+			CheckClosed();
 			return session.CreateTextMessage(text);
 		}
 
 		public IMapMessage CreateMapMessage()
 		{
+			CheckClosed();
 			return session.CreateMapMessage();
 		}
 
 		public IObjectMessage CreateObjectMessage(object body)
 		{
+			CheckClosed();
 			return session.CreateObjectMessage(body);
 		}
 
 		public IBytesMessage CreateBytesMessage()
 		{
+			CheckClosed();
 			return session.CreateBytesMessage();
 		}
 
 		public IBytesMessage CreateBytesMessage(byte[] body)
 		{
+			CheckClosed();
 			return session.CreateBytesMessage(body);
 		}
 	}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/ConnectionControlMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -86,13 +86,11 @@
     public override void TightMarshal2(OpenWireFormat wireFormat, Object o, BinaryWriter dataOut, BooleanStream bs) {
         base.TightMarshal2(wireFormat, o, dataOut, bs);
 
-        ConnectionControl info = (ConnectionControl)o;
         bs.ReadBoolean();
         bs.ReadBoolean();
         bs.ReadBoolean();
         bs.ReadBoolean();
         bs.ReadBoolean();
-
     }
 
     // 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V1/DataStructureSupportMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/V2/ConnectionControlMarshaller.cs Tue Feb 12 15:00:40 2008
@@ -86,7 +86,6 @@
     public override void TightMarshal2(OpenWireFormat wireFormat, Object o, BinaryWriter dataOut, BooleanStream bs) {
         base.TightMarshal2(wireFormat, o, dataOut, bs);
 
-        ConnectionControl info = (ConnectionControl)o;
         bs.ReadBoolean();
         bs.ReadBoolean();
         bs.ReadBoolean();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Tue Feb 12 15:00:40 2008
@@ -26,21 +26,23 @@
 	/// </summary>
 	public class Session : ISession
 	{
-		private AcknowledgementMode acknowledgementMode;
+		private readonly AcknowledgementMode acknowledgementMode;
 		private bool asyncSend;
 		private Connection connection;
 		private long consumerCounter;
-		private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
 		private bool dispatchAsync;
-		private DispatchingThread dispatchingThread;
+		private readonly DispatchingThread dispatchingThread;
 		private bool exclusive;
-		private SessionInfo info;
+		private readonly SessionInfo info;
 		private int maximumPendingMessageLimit;
 		private int prefetchSize = 1000;
 		private byte priority;
 		private long producerCounter;
 		private bool retroactive;
-		private TransactionContext transactionContext;
+		private readonly TransactionContext transactionContext;
+		internal bool startedAsyncDelivery = false;
 		private bool disposed = false;
 
 		public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
@@ -50,10 +52,8 @@
 			this.acknowledgementMode = acknowledgementMode;
 			this.asyncSend = connection.AsyncSend;
 			transactionContext = new TransactionContext(this);
-			dispatchingThread =
-					new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-			dispatchingThread.ExceptionListener +=
-					new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+			dispatchingThread = new DispatchingThread(DispatchAsyncMessages);
+			dispatchingThread.ExceptionListener += dispatchingThread_ExceptionListener;
 		}
 
 		~Session()
@@ -166,7 +166,7 @@
 
 			try
 			{
-				connection.DisposeOf(info.SessionId);
+				Close();
 			}
 			catch
 			{
@@ -184,10 +184,27 @@
 		public IMessageProducer CreateProducer(IDestination destination)
 		{
 			ProducerInfo command = CreateProducerInfo(destination);
-			connection.SyncRequest(command);
-			return new MessageProducer(this, command);
-		}
+			ProducerId producerId = command.ProducerId;
+			MessageProducer producer = null;
+
+			try
+			{
+				producer = new MessageProducer(this, command);
+				connection.SyncRequest(command);
+				producers[producerId] = producer;
+			}
+			catch(Exception)
+			{
+				if(producer != null)
+				{
+					producer.Close();
+				}
+
+				throw;
+			}
 
+			return producer;
+		}
 
 		public IMessageConsumer CreateConsumer(IDestination destination)
 		{
@@ -206,22 +223,24 @@
 			command.AcknowledgementMode = acknowledgementMode;
 
 			ConsumerId consumerId = command.ConsumerId;
+			MessageConsumer consumer = null;
 
 			try
 			{
-				MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+				consumer = new MessageConsumer(this, command, acknowledgementMode);
 				// lets register the consumer first in case we start dispatching messages immediately
-				connection.AddConsumer(consumerId, consumer);
-
-				connection.SyncRequest(command);
-
 				consumers[consumerId] = consumer;
+				connection.SyncRequest(command);
 				return consumer;
 			}
-			catch(Exception e)
+			catch(Exception)
 			{
-				connection.RemoveConsumer(consumerId);
-				throw e;
+				if(consumer != null)
+				{
+					consumer.Close();
+				}
+
+				throw;
 			}
 		}
 
@@ -235,23 +254,26 @@
 			ConsumerId consumerId = command.ConsumerId;
 			command.SubscriptionName = name;
 			command.NoLocal = noLocal;
+			MessageConsumer consumer = null;
 
 			try
 			{
-				MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+				consumer = new MessageConsumer(this, command, acknowledgementMode);
 				// lets register the consumer first in case we start dispatching messages immediately
-				connection.AddConsumer(consumerId, consumer);
-
-				connection.SyncRequest(command);
-
 				consumers[consumerId] = consumer;
-				return consumer;
+				connection.SyncRequest(command);
 			}
-			catch(Exception e)
+			catch(Exception)
 			{
-				connection.RemoveConsumer(consumerId);
-				throw e;
+				if(consumer != null)
+				{
+					consumer.Close();
+				}
+
+				throw;
 			}
+
+			return consumer;
 		}
 
 		public IQueue GetQueue(string name)
@@ -368,8 +390,20 @@
 
 		public void Close()
 		{
-			// To do: what about session id?
+			connection.RemoveSession(this);
 			StopAsyncDelivery();
+			foreach(MessageConsumer consumer in GetConsumers())
+			{
+				consumer.Close();
+			}
+			consumers.Clear();
+
+			foreach(MessageProducer producer in GetProducers())
+			{
+				producer.Close();
+			}
+			producers.Clear();
+			connection = null;
 		}
 
 		#endregion
@@ -424,9 +458,28 @@
 
 		public void DisposeOf(ConsumerId objectId)
 		{
+			connection.DisposeOf(objectId);
 			consumers.Remove(objectId);
-			connection.RemoveConsumer(objectId);
+		}
+
+		public void DisposeOf(ProducerId objectId)
+		{
 			connection.DisposeOf(objectId);
+			producers.Remove(objectId);
+		}
+
+		public bool DispatchMessage(ConsumerId consumerId, Message message)
+		{
+			bool dispatched = false;
+			MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+
+			if(consumer != null)
+			{
+				consumer.Dispatch((ActiveMQMessage) message);
+				dispatched = true;
+			}
+
+			return dispatched;
 		}
 
 		/// <summary>
@@ -443,7 +496,6 @@
 			}
 		}
 
-
 		/// <summary>
 		/// Returns a copy of the current consumers in a thread safe way to avoid concurrency
 		/// problems if the consumers are changed in another thread
@@ -456,6 +508,18 @@
 			}
 		}
 
+		/// <summary>
+		/// Returns a copy of the current consumers in a thread safe way to avoid concurrency
+		/// problems if the consumers are changed in another thread
+		/// </summary>
+		protected ICollection GetProducers()
+		{
+			lock(producers.SyncRoot)
+			{
+				return new ArrayList(producers.Values);
+			}
+		}
+
 		protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
 		{
 			ConsumerInfo answer = new ConsumerInfo();
@@ -519,7 +583,11 @@
 
 		internal void StopAsyncDelivery()
 		{
-			dispatchingThread.Stop();
+			if(startedAsyncDelivery)
+			{
+				dispatchingThread.Stop();
+				startedAsyncDelivery = false;
+			}
 		}
 
 		internal void StartAsyncDelivery(Dispatcher dispatcher)
@@ -528,7 +596,9 @@
 			{
 				dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
 			}
+
 			dispatchingThread.Start();
+			startedAsyncDelivery = true;
 		}
 	}
-}
\ No newline at end of file
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs Tue Feb 12 15:00:40 2008
@@ -33,7 +33,7 @@
     {
         private TransactionId transactionId;
         private Session session;
-        private ArrayList synchronizations = new ArrayList();
+        private ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
         
         public TransactionContext(Session session)
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/FutureResponse.cs Tue Feb 12 15:00:40 2008
@@ -30,6 +30,11 @@
     {
 	    
         private static int maxWait = -1;
+        public int Timeout
+        {
+        	get { return maxWait; }
+        	set { maxWait = value; }
+        }
 
         private readonly CountDownLatch latch = new CountDownLatch(1);
         private Response response;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ResponseCorrelator.cs Tue Feb 12 15:00:40 2008
@@ -30,34 +30,42 @@
     /// </summary>
     public class ResponseCorrelator : TransportFilter
     {
-
-        private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
-        private readonly Object mutex = new Object();
+		private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
+		private readonly Object mutex = new Object();
         private short nextCommandId;
+        private int requestTimeout = -1;
 
-        public ResponseCorrelator(ITransport next) : base(next) {
+        public ResponseCorrelator(ITransport next, int requestTimeout) : base(next)
+		{
+			this.requestTimeout = requestTimeout;
         }
 
-        short GetNextCommandId() {
-            lock(mutex) {
+        short GetNextCommandId()
+		{
+            lock(mutex)
+			{
                 return ++nextCommandId;
             }
         }
 
         public override void Oneway(Command command)
         {
-            command.CommandId = GetNextCommandId();
+			int commandId = GetNextCommandId();
+
+            command.CommandId = commandId;
             command.ResponseRequired = false;
             next.Oneway(command);
         }
 
         public override FutureResponse AsyncRequest(Command command)
         {
-            command.CommandId = GetNextCommandId();
+			int commandId = GetNextCommandId();
+
+        	command.CommandId = commandId;
             command.ResponseRequired = true;
             FutureResponse future = new FutureResponse();
-            requestMap[command.CommandId] = future;
-            next.Oneway(command);
+            requestMap[commandId] = future;
+			next.Oneway(command);
             return future;
 
         }
@@ -65,15 +73,18 @@
         public override Response Request(Command command)
         {
             FutureResponse future = AsyncRequest(command);
+            future.Timeout = requestTimeout;
             Response response = future.Response;
             if (response != null && response is ExceptionResponse)
             {
                 ExceptionResponse er = (ExceptionResponse) response;
                 BrokerError brokerError = er.Exception;
-				if (brokerError == null) {
+				if (brokerError == null)
+				{
 	                throw new BrokerException();
 				}
-				else {
+				else
+				{
 	                throw new BrokerException(brokerError);
 				}
             }
@@ -82,37 +93,41 @@
 
         protected override void OnCommand(ITransport sender, Command command)
         {
-            if( command is Response ) {
-
+            if(command is Response)
+			{
                 Response response = (Response) command;
-                FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
-                if (future != null)
-                {
-                    if (response is ExceptionResponse)
-                    {
-                        ExceptionResponse er = (ExceptionResponse) response;
-                        BrokerError brokerError = er.Exception;
-                        BrokerException exception = new BrokerException(brokerError);
-                        this.exceptionHandler(this, exception);
-                    }
-                    future.Response = response;
-                }
-                else
+				int correlationId = response.CorrelationId;
+
+				FutureResponse future = (FutureResponse) requestMap[correlationId];
+                
+				if(future != null)
                 {
-                    if (command is ShutdownInfo)
-                    {
-                        // lets shutdown
-                        this.commandHandler(sender, command);
-                    }
-                    else {
-                        Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
-                    }
-                }
-            } else {
+					requestMap.Remove(correlationId);
+					future.Response = response;
+
+					if(response is ExceptionResponse)
+					{
+						ExceptionResponse er = (ExceptionResponse) response;
+						BrokerError brokerError = er.Exception;
+						BrokerException exception = new BrokerException(brokerError);
+						this.exceptionHandler(this, exception);
+					}
+				}
+				else
+				{
+					Tracer.Error("Unknown response ID: " + response.CommandId + " for response: " + response);
+				}
+            }
+            else if(command is ShutdownInfo)
+            {
+                // lets shutdown
+                this.commandHandler(sender, command);
+            }
+			else
+			{
                 this.commandHandler(sender, command);
             }
         }
-
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompFrameStream.cs Tue Feb 12 15:00:40 2008
@@ -63,20 +63,33 @@
 				WriteHeader("content-length", contentLength);
 			}
 		}
-		
+
 		public void WriteCommand(Command command, String name)
 		{
+			WriteCommand(command, name, false);
+		}
+
+		public void WriteCommand(Command command, String name, bool ignoreErrors)
+		{
 			builder.Append(name);
 			builder.Append(NEWLINE);
-			if (command.ResponseRequired)
+			if(command.ResponseRequired)
 			{
-				WriteHeader("receipt", command.CommandId);
+				if(ignoreErrors)
+				{
+					WriteHeader("receipt", "ignore:" + command.CommandId);
+				}
+				else
+				{
+					WriteHeader("receipt", command.CommandId);
+				}
 			}
 		}
 		
 		public void WriteHeader(String name, Object value)
 		{
-			if (value != null) {
+			if (value != null)
+			{
 				builder.Append(name);
 				builder.Append(SEPARATOR);
 				builder.Append(value);
@@ -86,7 +99,8 @@
 		
 		public void WriteHeader(String name, bool value)
 		{
-			if (value) {
+			if (value)
+			{
 				builder.Append(name);
 				builder.Append(SEPARATOR);
 				builder.Append("true");

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs Tue Feb 12 15:00:40 2008
@@ -14,9 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System.Reflection;
 using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.OpenWire.V1;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS;
 using System;
@@ -89,8 +87,8 @@
 					Response response = new Response();
 					response.CorrelationId = command.CommandId;
 					SendCommand(response);
+					Tracer.Debug("#### Autorespond to command: " + o.GetType());
 				}
-				Tracer.Debug("#### Ignored command: " + o.GetType());
 			}
 			else
 			{
@@ -182,15 +180,22 @@
 
 		protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
 		{
-			if (command == "RECEIPT" || command == "CONNECTED")
+			if(command == "RECEIPT" || command == "CONNECTED")
 			{
 				string text = RemoveHeader(headers, "receipt-id");
-				if (text != null)
+				if(text != null)
 				{
     				Response answer = new Response();
+					if(text.StartsWith("ignore:"))
+					{
+						text = text.Substring("ignore:".Length);
+					}
+
 					answer.CorrelationId = Int32.Parse(text);
 				    return answer;
-				} else if( command == "CONNECTED") {
+				}
+				else if(command == "CONNECTED")
+				{
                     text = RemoveHeader(headers, "response-id");
                     if (text != null)
                     {
@@ -200,20 +205,31 @@
                     }
 				}
 			}
-			else if (command == "ERROR")
+			else if(command == "ERROR")
 			{
-				ExceptionResponse answer = new ExceptionResponse();
 				string text = RemoveHeader(headers, "receipt-id");
-				if (text != null)
+
+				if(text != null && text.StartsWith("ignore:"))
 				{
-					answer.CorrelationId = Int32.Parse(text);
+					Response answer = new Response();
+					answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
+					return answer;
+				}
+				else
+				{
+					ExceptionResponse answer = new ExceptionResponse();
+					if(text != null)
+					{
+						answer.CorrelationId = Int32.Parse(text);
+					}
+
+					BrokerError error = new BrokerError();
+					error.Message = RemoveHeader(headers, "message");
+					error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
+					// TODO is this the right header?
+					answer.Exception = error;
+					return answer;
 				}
-				
-				BrokerError error = new BrokerError();
-				error.Message = RemoveHeader(headers, "message");
-				error.ExceptionClass = RemoveHeader(headers, "exceptionClass"); // TODO is this the right header?
-				answer.Exception = error;
-				return answer;
 			}
 			else if (command == "MESSAGE")
 			{
@@ -236,11 +252,6 @@
 				message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
 			}
 
-			if (message is ActiveMQTextMessage)
-			{
-				ActiveMQTextMessage textMessage = message as ActiveMQTextMessage;
-			}
-			
 			// TODO now lets set the various headers
 			
 			message.Type = RemoveHeader(headers, "type");
@@ -309,6 +320,7 @@
 		protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
 		{
 			ss.WriteCommand(command, "DISCONNECT");
+			System.Diagnostics.Debug.Assert(!command.ResponseRequired);
 			ss.Flush();
 		}
 
@@ -321,12 +333,7 @@
             ss.WriteHeader("selector", command.Selector);
             if ( command.NoLocal )
                 ss.WriteHeader("no-local", command.NoLocal);
-
-			if ( AcknowledgementMode.ClientAcknowledge == command.AcknowledgementMode
-				|| AcknowledgementMode.AutoClientAcknowledge == command.AcknowledgementMode
-                || AcknowledgementMode.Transactional == command.AcknowledgementMode 
-                )
-				ss.WriteHeader("ack", "client");
+			ss.WriteHeader("ack", "client");
 
 			// ActiveMQ extensions to STOMP
 			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
@@ -346,19 +353,19 @@
 		protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
 		{
 			object id = command.ObjectId;
+
 			if (id is ConsumerId)
 			{
 				ConsumerId consumerId = id as ConsumerId;
 				ss.WriteCommand(command, "UNSUBSCRIBE");
-				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));				
+				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
 				ss.Flush();
                 consumers.Remove(consumerId);
             }
-		    // When a session is removed, it needs to remove it's consumers too.
-            if (id is SessionId)
+            else if (id is SessionId)
             {
-                
-                // Find all the consumer that were part of the session.
+				// When a session is removed, it needs to remove it's consumers too.
+				// Find all the consumer that were part of the session.
                 SessionId sessionId = (SessionId) id;
                 ArrayList matches = new ArrayList();
                 foreach (DictionaryEntry entry in consumers)
@@ -370,6 +377,8 @@
                     }
                 }
 
+            	bool unsubscribedConsumer = false;
+
                 // Un-subscribe them.
                 foreach (ConsumerId consumerId in matches)
                 {
@@ -377,8 +386,34 @@
                     ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
                     ss.Flush();
                     consumers.Remove(consumerId);
+                	unsubscribedConsumer = true;
                 }
+
+				if(!unsubscribedConsumer && command.ResponseRequired)
+				{
+					ss.WriteCommand(command, "UNSUBSCRIBE", true);
+					ss.WriteHeader("id", sessionId);
+					ss.Flush();
+				}
             }
+			else if(id is ProducerId)
+			{
+				if(command.ResponseRequired)
+				{
+					ss.WriteCommand(command, "UNSUBSCRIBE", true);
+					ss.WriteHeader("id", id);
+					ss.Flush();
+				}
+			}
+			else if(id is ConnectionId)
+			{
+				if(command.ResponseRequired)
+				{
+					ss.WriteCommand(command, "UNSUBSCRIBE", true);
+					ss.WriteHeader("id", id);
+					ss.Flush();
+				}
+			}
 		}
 		
 		
@@ -458,12 +493,14 @@
 		
 		protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
 		{
-			ss.WriteCommand(command, "ACK");
+			ss.WriteCommand(command, "ACK", true);
 			
 			// TODO handle bulk ACKs?
-            ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
-			if( command.TransactionId!=null )
+			ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+			if(command.TransactionId != null)
+			{
                 ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+			}
 
 			ss.Flush();
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Feb 12 15:00:40 2008
@@ -30,19 +30,21 @@
     /// </summary>
     public class TcpTransport : ITransport
     {
-		private readonly object initLock = "initLock";
-        private readonly Socket socket;
-    	private IWireFormat wireformat;
+		private readonly object initLock = new object();
+		private readonly Socket socket;
+		private IWireFormat wireformat;
         private BinaryReader socketReader;
-		private readonly object socketReaderLock = "socketReaderLock";
+		private readonly object socketReaderLock = new object();
         private BinaryWriter socketWriter;
-		private readonly object socketWriterLock = "socketWriterLock";
+		private readonly object socketWriterLock = new object();
 		private Thread readThread;
         private bool started;
         private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
         
         private CommandHandler commandHandler;
         private ExceptionHandler exceptionHandler;
+		private const int MAX_THREAD_WAIT = 30000;
+
         
         public TcpTransport(Socket socket, IWireFormat wireformat)
         {
@@ -72,7 +74,7 @@
             		}
 
             		started = true;
-	                
+
 					// As reported in AMQ-988 it appears that NetworkStream is not thread safe
 					// so lets use an instance for each of the 2 streams
 					socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
@@ -125,11 +127,11 @@
 
         public void Close()
         {
-			if (closed.CompareAndSet(false, true))
+			if(closed.CompareAndSet(false, true))
 			{
-                lock (initLock)
+                lock(initLock)
                 {
-                        try
+					try
 					{
 						socket.Shutdown(SocketShutdown.Both);
 					}
@@ -137,7 +139,7 @@
 					{
 					}
 
-					lock (socketWriterLock)
+					lock(socketWriterLock)
 					{
 						if(null != socketWriter)
 						{
@@ -146,7 +148,7 @@
 						}
 					}
 
-					lock (socketReaderLock)
+					lock(socketReaderLock)
 					{
 						if(null != socketReader)
 						{
@@ -157,20 +159,25 @@
 
 					socket.Close();
 
-					if(null != readThread
-						&& Thread.CurrentThread != readThread
+					if(null != readThread)
+					{
+						if(Thread.CurrentThread != readThread
 #if !NETCF
-						&& readThread.IsAlive
+							&& readThread.IsAlive
 #endif
-						)
-					{
-						readThread.Abort();
-						readThread.Join();
+							)
+						{
+							if(!readThread.Join(MAX_THREAD_WAIT))
+							{
+								readThread.Abort();
+							}
+						}
+
 						readThread = null;
 					}
-				}
 
-				started = false;
+					started = false;
+				}
 			}
         }
 
@@ -195,32 +202,35 @@
             // An exception in the command handler may not be fatal to the transport, so
             // these are simply reported to the exceptionHandler.
             //
-            while (!closed.Value)
+            while(!closed.Value)
             {
                 Command command = null;
-                try
+
+				try
                 {
                     command = (Command) Wireformat.Unmarshal(socketReader);
                 }
                 catch(Exception ex)
                 {
-                    if (!closed.Value)
-                    {
+                    command = null;
+					if(!closed.Value)
+					{
 						// Close the socket as there's little that can be done with this transport now.
 						Close();
 						this.exceptionHandler(this, ex);
-                        break;
-                    }
+					}
+
+                	break;
                 }
 
                 try
                 {
-					if (command != null)
+					if(command != null)
 					{
 						this.commandHandler(this, command);
 					}
                 }
-                catch (Exception e)
+                catch(Exception e)
                 {
                     this.exceptionHandler(this, e);
                 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Tue Feb 12 15:00:40 2008
@@ -28,18 +28,35 @@
 {
 	public class TcpTransportFactory : ITransportFactory
 	{
-		private bool useLogging = false;
-
 		public TcpTransportFactory()
 		{
 		}
 
+		#region Properties
+
+		private bool useLogging = false;
 		public bool UseLogging
 		{
 			get { return useLogging; }
 			set { useLogging = value; }
 		}
 
+		private string wireFormat = "OpenWire";
+		public string WireFormat
+		{
+			get { return wireFormat; }
+			set { wireFormat = value; }
+		}
+
+		private int requestTimeout = -1;
+		public int RequestTimeout
+		{
+			get { return requestTimeout; }
+			set { requestTimeout = value; }
+		}
+
+		#endregion
+
 		#region ITransportFactory Members
 
 		public ITransport CreateTransport(Uri location)
@@ -68,7 +85,7 @@
 			}
 
 			transport = new MutexTransport(transport);
-			transport = new ResponseCorrelator(transport);
+			transport = new ResponseCorrelator(transport, requestTimeout);
 
 			return transport;
 		}
@@ -79,7 +96,16 @@
 		{
 			// Looping through the AddressList allows different type of connections to be tried
 			// (IPv4, IPv6 and whatever else may be available).
+#if MONO
+			// The following GetHostByName() API has been obsoleted in .NET 2.0.  It has been
+			// superceded by GetHostEntry().  At some point, it will probably be removed
+			// from the Mono class library, and this #if statement can be removed.
+
+			IPHostEntry hostEntry = Dns.GetHostByName(host);
+#else
 			IPHostEntry hostEntry = Dns.GetHostEntry(host);
+#endif
+
 			foreach(IPAddress address in hostEntry.AddressList)
 			{
 				Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
@@ -95,19 +121,24 @@
 		protected IWireFormat CreateWireFormat(Uri location, StringDictionary map)
 		{
 			object properties = null;
-			IWireFormat wireFormat = null;
+			IWireFormat wireFormatItf = null;
 
 			// Detect STOMP etc
 			if(String.Compare(location.Scheme, "stomp", true) == 0)
 			{
-				wireFormat = new StompWireFormat();
-				properties = wireFormat;
+				this.wireFormat = "STOMP";
+			}
+
+			if(String.Compare(this.wireFormat, "stomp", true) == 0)
+			{
+				wireFormatItf = new StompWireFormat();
+				properties = wireFormatItf;
 			}
 			else
 			{
 				OpenWireFormat openwireFormat = new OpenWireFormat();
 
-				wireFormat = openwireFormat;
+				wireFormatItf = openwireFormat;
 				properties = openwireFormat.PreferedWireFormatInfo;
 			}
 
@@ -117,7 +148,7 @@
 				URISupport.SetProperties(properties, map, "wireFormat.");
 			}
 
-			return wireFormat;
+			return wireFormatItf;
 		}
 	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs?rev=627169&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/Convert.cs Tue Feb 12 15:00:40 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ.Util
+{
+	class NMSConvert
+	{
+		public static AcknowledgementMode ToAcknowledgementMode(string ackText)
+		{
+			if(String.Compare(ackText, "AutoAcknowledge", true) == 0)
+			{
+				return AcknowledgementMode.AutoAcknowledge;
+			}
+			else if(String.Compare(ackText, "ClientAcknowledge", true) == 0)
+			{
+				return AcknowledgementMode.ClientAcknowledge;
+			}
+			else if(String.Compare(ackText, "DupsOkAcknowledge", true) == 0)
+			{
+				return AcknowledgementMode.DupsOkAcknowledge;
+			}
+			else if(String.Compare(ackText, "Transactional", true) == 0)
+			{
+				return AcknowledgementMode.Transactional;
+			}
+			else
+			{
+				return AcknowledgementMode.AutoAcknowledge;
+			}
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BadConsumeTest.cs Tue Feb 12 15:00:40 2008
@@ -14,21 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
 using NUnit.Framework;
-using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
 	[TestFixture]
-    public class BadConsumeTest : NMS.Test.BadConsumeTest
+	public class BadConsumeTest_OpenWire : Apache.NMS.Test.BadConsumeTest
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
-            return new ConnectionFactory();
+        	return TestUtils.CreateOpenWireConnectionFactory();
         }
     }
+
+	[TestFixture]
+	public class BadConsumeTest_Stomp : Apache.NMS.Test.BadConsumeTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
+	}
 }
 
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/BytesMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
 using NUnit.Framework;
-using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
     [TestFixture]
-    public class BytesMessageTest : NMS.Test.BytesMessageTest
+    public class BytesMessageTest_OpenWire : Apache.NMS.Test.BytesMessageTest
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
-            return new ConnectionFactory();
+        	return TestUtils.CreateOpenWireConnectionFactory();
         }
     }
-}
-
-
 
+	[TestFixture]
+	public class BytesMessageTest_Stomp : Apache.NMS.Test.BytesMessageTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/CommandTest.cs Tue Feb 12 15:00:40 2008
@@ -19,7 +19,7 @@
 using System.Collections;
 
 
-namespace Apache.NMS.ActiveMQ.Commands
+namespace Apache.NMS.ActiveMQ.Test.Commands
 {
 	[TestFixture]
     public class CommandTest

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs Tue Feb 12 15:00:40 2008
@@ -20,7 +20,7 @@
 [assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
 [assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
 [assembly: AssemblyProductAttribute("Apache NMS for ActiveMQ Class Library")]
-[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2007 Apache Software Foundation")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2008 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
 [assembly: AssemblyVersionAttribute("1.0")]

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/ConsumerTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
 using NUnit.Framework;
-using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
 	[TestFixture]
-    public class ConsumerTest : NMS.Test.ConsumerTest
+    public class ConsumerTest_OpenWire : Apache.NMS.Test.ConsumerTest
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
-            return new ConnectionFactory();
+        	return TestUtils.CreateOpenWireConnectionFactory();
         }
     }
-}
-
-
 
+	[TestFixture]
+	public class ConsumerTest_Stomp : Apache.NMS.Test.ConsumerTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DurableTest.cs Tue Feb 12 15:00:40 2008
@@ -14,18 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
-using Apache.NMS;
 using NUnit.Framework;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
     [TestFixture]
-    public class DurableTest : NMS.Test.DurableTest
+    public class DurableTest_OpenWire : Apache.NMS.Test.DurableTest
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
-            return new ConnectionFactory();
+        	return TestUtils.CreateOpenWireConnectionFactory();
         }
     }
+
+	[TestFixture]
+	public class DurableTest_Stomp : Apache.NMS.Test.DurableTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MapMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -14,22 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
 using NUnit.Framework;
-using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
-	[ TestFixture ]
-    public class MapMessageTest : NMS.Test.MapMessageTest
-    {
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-            return new ConnectionFactory();
-        }
-	    
-    }
-}
+	[TestFixture]
+	public class MapMessageTest_OpenWire : Apache.NMS.Test.MapMessageTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateOpenWireConnectionFactory();
+		}
+	}
+
+	[TestFixture]
+	public class MapMessageTest_Stomp : Apache.NMS.Test.MapMessageTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
 
+		public override void SendAndSyncReceive()
+		{
+			// TODO disable test
+		}
 
+		protected override void AssertValidMessage(IMessage message)
+		{
+			System.Console.WriteLine("Received MapMessage: " + message);
+
+			Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!");
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Tue Feb 12 15:00:40 2008
@@ -1,58 +1,52 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using Apache.NMS;
-using NUnit.Framework;
-using System.Reflection;
-using System.IO;
-
-namespace Apache.NMS.ActiveMQ
-{
-    [TestFixture]
-    public class NMSConnectionFactoryTest 
-    {
-        [Test]
-        public void TestTcpURI()
-        {
-            NMSConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
-            Assert.IsNotNull(factory);
-            Assert.IsNotNull(factory.ConnectionFactory);
-            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
-        }
-        [Test]
-        public void TestStompURI()
-        {
-            NMSConnectionFactory factory = new NMSConnectionFactory("stomp://localhost:61616");
-            Assert.IsNotNull(factory);
-            Assert.IsNotNull(factory.ConnectionFactory);
-            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
-        }
-
-        [Test]
-        public void TestActiveMQURI()
-        {
-            NMSConnectionFactory factory = new NMSConnectionFactory("activemq:tcp://localhost:61616");
-            Assert.IsNotNull(factory);
-            Assert.IsNotNull(factory.ConnectionFactory);
-            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
-        }
-
-    }
-
-
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using Apache.NMS;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class NMSConnectionFactoryTest 
+    {
+        [Test]
+        public void TestTcpURI()
+        {
+            NMSConnectionFactory factory = new NMSConnectionFactory("tcp://localhost:61616");
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+        }
+
+		[Test]
+        public void TestStompURI()
+        {
+            NMSConnectionFactory factory = new NMSConnectionFactory("stomp://localhost:61613");
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+        }
+
+        [Test]
+        public void TestActiveMQURI()
+        {
+            NMSConnectionFactory factory = new NMSConnectionFactory("activemq:tcp://localhost:61616");
+            Assert.IsNotNull(factory);
+            Assert.IsNotNull(factory.ConnectionFactory);
+            Assert.IsTrue(factory.ConnectionFactory is Apache.NMS.ActiveMQ.ConnectionFactory);
+        }
+    }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSPropertyTest.cs Tue Feb 12 15:00:40 2008
@@ -14,23 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-//using Apache.NMS.ActiveMQ;
-using Apache.NMS.ActiveMQ;
-using Apache.NMS;
+
 using NUnit.Framework;
-using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.ActiveMQ.Test
 {
-    [ TestFixture ]
-    public class NMSPropertyTest : NMS.Test.NMSPropertyTest
+    [TestFixture]
+    public class NMSPropertyTest_OpenWire : Apache.NMS.Test.NMSPropertyTest
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
-            return new ConnectionFactory();
-        }
+			return TestUtils.CreateOpenWireConnectionFactory();
+		}
     }
-}
 
+	[TestFixture]
+	public class NMSPropertyTest_Stomp : Apache.NMS.Test.NMSPropertyTest
+	{
+		protected override IConnectionFactory CreateConnectionFactory()
+		{
+			return TestUtils.CreateStompConnectionFactory();
+		}
 
+		protected override void AssertNonStringProperties(IMessage message)
+		{
+			// lets disable typesafe property testing as right now Stomp does not support them
+		}
 
+		protected override void AssertReplyToValid(IMessage message)
+		{
+			Assert.IsNotNull(message.NMSReplyTo, "NMSReplyTo");
+			Assert.IsTrue(message.NMSReplyTo is ITemporaryQueue, "The reply to destination is not a TemporaryTopic!: " + message.NMSReplyTo);
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/BooleanStreamTest.cs Tue Feb 12 15:00:40 2008
@@ -17,8 +17,10 @@
 using NUnit.Framework;
 using System;
 using System.IO;
+using Apache.NMS.ActiveMQ.OpenWire;
 
-namespace Apache.NMS.ActiveMQ.OpenWire {
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
+{
         [ TestFixture ]
         public class BooleanStreamTest
         {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/EndianTest.cs Tue Feb 12 15:00:40 2008
@@ -19,7 +19,7 @@
 using System;
 using System.IO;
 
-namespace Apache.NMS.ActiveMQ.OpenWire
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
 {
     [TestFixture]
     public class EndianTest

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/OpenWire/PrimitiveMapTest.cs Tue Feb 12 15:00:40 2008
@@ -20,7 +20,7 @@
 using System.IO;
 using Apache.NMS.ActiveMQ.OpenWire;
 
-namespace Apache.NMS.ActiveMQ.OpenWire
+namespace Apache.NMS.ActiveMQ.Test.OpenWire
 {
     [TestFixture]
     public class PrimitiveMapTest

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/BadConsumeTest.cs Tue Feb 12 15:00:40 2008
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
-	[TestFixture]
-    public class BadConsumeTest : NMS.Test.BadConsumeTest
-    {
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-        	return StompTestUtils.CreateStompConnectionFactory();
-        }
-    }
-}
-
-

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs Tue Feb 12 15:00:40 2008
@@ -1,27 +0,0 @@
-using System;
-using System.Reflection;
-using System.Runtime.InteropServices;
-
-//------------------------------------------------------------------------------
-// <auto-generated>
-//     This code was generated by a tool.
-//     Runtime Version:2.0.50727.832
-//
-//     Changes to this file may cause incorrect behavior and will be lost if
-//     the code is regenerated.
-// </auto-generated>
-//------------------------------------------------------------------------------
-
-[assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
-[assembly: AssemblyTitleAttribute("Stomp .NET Tests")]
-[assembly: AssemblyDescriptionAttribute("Unit Tests for the NMS (.Net Messaging Library) using the STOMP protocol")]
-[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
-[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/")]
-[assembly: AssemblyProductAttribute("Apache ActiveMQ")]
-[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2007 Apache Software Foundation")]
-[assembly: AssemblyTrademarkAttribute("")]
-[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0")]
-[assembly: AssemblyInformationalVersionAttribute("4.0")]
-

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/ConsumerTest.cs Tue Feb 12 15:00:40 2008
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
-    [TestFixture]
-    public class ConsumerTest : NMS.Test.ConsumerTest
-    {
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-			return StompTestUtils.CreateStompConnectionFactory();
-		}
-    }
-}
-
-
-

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/DurableTest.cs Tue Feb 12 15:00:40 2008
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using Apache.NMS;
-using NUnit.Framework;
-
-namespace Apache.Stomp
-{
-    [TestFixture]
-    public class DurableTest : NMS.Test.DurableTest
-    {
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-			return StompTestUtils.CreateStompConnectionFactory();
-		}
-    }
-}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/MapMessageTest.cs Tue Feb 12 15:00:40 2008
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.Stomp;
-using Apache.NMS;
-using NUnit.Framework;
-using System;
-
-namespace Apache.Stomp
-{
-	[ TestFixture ]
-    public class MapMessageTest : NMS.Test.MapMessageTest
-    {
-		public override void SendAndSyncReceive()
-        {
-            // TODO disable test
-        }
-		
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-			return StompTestUtils.CreateStompConnectionFactory();
-		}
-		
-        protected override void AssertValidMessage(IMessage message)
-        {
-            Console.WriteLine("Received MapMessage: " + message);
-			
-            Assert.IsTrue(message is IMapMessage, "Did not receive a MapMessage!");
-		}
-    }
-}
-
-

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs?rev=627169&r1=627168&r2=627169&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Stomp/NMSPropertyTest.cs Tue Feb 12 15:00:40 2008
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using Apache.NMS;
-using NUnit.Framework;
-using Apache.Stomp;
-using System;
-
-namespace Apache.Stomp
-{
-    [ TestFixture ]
-    public class NMSPropertyTest : NMS.Test.NMSPropertyTest
-    {
-        protected override IConnectionFactory CreateConnectionFactory()
-        {
-			return StompTestUtils.CreateStompConnectionFactory();
-		}
-		
-		protected override void AssertNonStringProperties(IMessage message)
-		{
-			// lets disable typesafe property testing as right now Stomp does not support them
-		}
-		
-		
-		protected override void AssertReplyToValid(IMessage message)
-		{
-			// TODO completely support temporary destinations in STOMP
-			
-			Assert.IsNotNull(message.NMSReplyTo, "NMSReplyTo");
-			Assert.IsTrue(message.NMSReplyTo is ITemporaryQueue, "The reply to destination is not a TemporaryTopic!: " + message.NMSReplyTo);
-		}
-    }
-}
-
-
-