You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/09/22 20:11:31 UTC

svn commit: r449031 - in /incubator/activemq/activemq-dotnet/trunk/src: main/csharp/ main/csharp/ActiveMQ/ main/csharp/ActiveMQ/Transport/ main/csharp/ActiveMQ/Transport/Tcp/ test/csharp/ test/csharp/NMS/

Author: jstrachan
Date: Fri Sep 22 11:11:30 2006
New Revision: 449031

URL: http://svn.apache.org/viewvc?view=rev&rev=449031
Log:
Tidied up the wire format negotiation code for C# together with disabling logging when testing and some minor cleanups of the code

Added:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
Modified:
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
    incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs
    incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs
    incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Fri Sep 22 11:11:30 2006
@@ -160,8 +160,8 @@
             if (response is ExceptionResponse)
             {
                 ExceptionResponse exceptionResponse = (ExceptionResponse) response;
-                // TODO include stack trace
-                throw new NMSException("Request failed: " + exceptionResponse);
+                BrokerError brokerError = exceptionResponse.Exception;
+                throw new BrokerException(brokerError);
             }
             return response;
         }

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs Fri Sep 22 11:11:30 2006
@@ -21,50 +21,50 @@
 namespace ActiveMQ.Transport
 {
 	
-	/// <summary>
-	/// A Transport which gaurds access to the next transport using a mutex.
-	/// </summary>
-	public class MutexTransport : TransportFilter
+    /// <summary>
+    /// A Transport which gaurds access to the next transport using a mutex.
+    /// </summary>
+    public class MutexTransport : TransportFilter
     {
-		
-		private readonly object transmissionLock = new object();
-		
-		public MutexTransport(ITransport next) : base(next) {
-		}
-		
-		
-		public override void Oneway(Command command)
-		{
+
+        private readonly object transmissionLock = new object();
+
+        public MutexTransport(ITransport next) : base(next) {
+        }
+
+        
+        public override void Oneway(Command command)
+        {
             lock (transmissionLock)
             {
-				this.next.Oneway(command);
-			}
-		}
-		
-		public override FutureResponse AsyncRequest(Command command)
-		{
+                this.next.Oneway(command);
+            }
+        }
+
+        public override FutureResponse AsyncRequest(Command command)
+        {
             lock (transmissionLock)
             {
-				return base.AsyncRequest(command);
-			}
-		}
-		
-		public override Response Request(Command command)
-		{
+                return base.AsyncRequest(command);
+            }
+        }
+
+        public override Response Request(Command command)
+        {
             lock (transmissionLock)
             {
-				return base.Request(command);
-			}
-		}
-		
-		public override void Dispose()
-		{
+                return base.Request(command);
+            }
+        }
+
+        public override void Dispose()
+        {
             lock (transmissionLock)
             {
-				base.Dispose();
-			}
-		}
-		
+                base.Dispose();
+            }
+        }
+
     }
 }
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs Fri Sep 22 11:11:30 2006
@@ -24,46 +24,46 @@
 namespace ActiveMQ.Transport
 {
 	
-	/// <summary>
-	/// A Transport which gaurds access to the next transport using a mutex.
-	/// </summary>
-	public class ResponseCorrelator : TransportFilter
+    /// <summary>
+    /// A Transport which gaurds access to the next transport using a mutex.
+    /// </summary>
+    public class ResponseCorrelator : TransportFilter
     {
 
         private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable());
         private readonly Object mutex = new Object();
         private short nextCommandId;
-		
-		public ResponseCorrelator(ITransport next) : base(next) {
-		}
-
-		short GetNextCommandId() {
-			lock(mutex) {
-				return ++nextCommandId;
-			}
-		}
-		
-		public override void Oneway(Command command)
-		{
-			command.CommandId = GetNextCommandId();
-			command.ResponseRequired = false;
-			next.Oneway(command);
-		}
-		
-		public override FutureResponse AsyncRequest(Command command)
-		{
-			command.CommandId = GetNextCommandId();
-			command.ResponseRequired = true;
-			FutureResponse future = new FutureResponse();
-			requestMap[command.CommandId] = future;
-			next.Oneway(command);
-			return future;
-
-		}
-		
-		public override Response Request(Command command)
-		{
-			FutureResponse future = AsyncRequest(command);
+
+        public ResponseCorrelator(ITransport next) : base(next) {
+        }
+
+        short GetNextCommandId() {
+            lock(mutex) {
+                return ++nextCommandId;
+            }
+        }
+
+        public override void Oneway(Command command)
+        {
+            command.CommandId = GetNextCommandId();
+            command.ResponseRequired = false;
+            next.Oneway(command);
+        }
+
+        public override FutureResponse AsyncRequest(Command command)
+        {
+            command.CommandId = GetNextCommandId();
+            command.ResponseRequired = true;
+            FutureResponse future = new FutureResponse();
+            requestMap[command.CommandId] = future;
+            next.Oneway(command);
+            return future;
+
+        }
+
+        public override Response Request(Command command)
+        {
+            FutureResponse future = AsyncRequest(command);
             Response response = future.Response;
             if (response is ExceptionResponse)
             {
@@ -72,33 +72,41 @@
                 throw new BrokerException(brokerError);
             }
             return response;
-		}
-		
-		protected override void OnCommand(ITransport sender, Command command)
-		{
-			if( command is Response ) {
-				
-				Response response = (Response) command;
-				FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
-				if (future != null)
-				{
-					if (response is ExceptionResponse)
-					{
-						ExceptionResponse er = (ExceptionResponse) response;
-						BrokerError brokerError = er.Exception;
-						this.exceptionHandler(this, new BrokerException(brokerError));
-					}
-					future.Response = response;
-				}
-				else
-				{
-					Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
-				}
-			} else {
-				this.commandHandler(sender, command);
-			}
-		}
-		
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if( command is Response ) {
+
+                Response response = (Response) command;
+                FutureResponse future = (FutureResponse) requestMap[response.CorrelationId];
+                if (future != null)
+                {
+                    if (response is ExceptionResponse)
+                    {
+                        ExceptionResponse er = (ExceptionResponse) response;
+                        BrokerError brokerError = er.Exception;
+                        BrokerException exception = new BrokerException(brokerError);
+                        this.exceptionHandler(this, exception);
+                    }
+                    future.Response = response;
+                }
+                else
+                {
+                    if (command is ShutdownInfo)
+                    {
+                        // lets shutdown
+                        this.commandHandler(sender, command);
+                    }
+                    else {
+                        Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);
+                    }
+                }
+            } else {
+                this.commandHandler(sender, command);
+            }
+        }
+
     }
 }
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Fri Sep 22 11:11:30 2006
@@ -28,10 +28,10 @@
 namespace ActiveMQ.Transport.Tcp
 {
 	
-	/// <summary>
-	/// An implementation of ITransport that uses sockets to communicate with the broker
-	/// </summary>
-	public class TcpTransport : ITransport
+    /// <summary>
+    /// An implementation of ITransport that uses sockets to communicate with the broker
+    /// </summary>
+    public class TcpTransport : ITransport
     {
         private Socket socket;
         private OpenWireFormat wireformat = new OpenWireFormat();
@@ -46,7 +46,7 @@
         
         public TcpTransport(Socket socket)
         {
-			this.socket = socket;
+            this.socket = socket;
         }
         
         /// <summary>
@@ -56,11 +56,11 @@
         {
             if (!started)
             {
-				if( commandHandler == null )
-					throw new InvalidOperationException ("command cannot be null when Start is called.");
-				if( exceptionHandler == null )
-					throw new InvalidOperationException ("exception cannot be null when Start is called.");
-				
+                if( commandHandler == null )
+                    throw new InvalidOperationException ("command cannot be null when Start is called.");
+                if( exceptionHandler == null )
+                    throw new InvalidOperationException ("exception cannot be null when Start is called.");
+
                 started = true;
                 
                 NetworkStream networkStream = new NetworkStream(socket);
@@ -70,23 +70,13 @@
                 // now lets create the background read thread
                 readThread = new Thread(new ThreadStart(ReadLoop));
                 readThread.Start();
-                
-                // lets send the wireformat we're using
-				WireFormatInfo info = new WireFormatInfo();
-				info.StackTraceEnabled=false;
-				info.TightEncodingEnabled=false;
-				info.TcpNoDelayEnabled=false;
-				info.CacheEnabled=false;
-				info.SizePrefixDisabled=false;
-				
-                Oneway(info);
             }
         }
         
-		public void Oneway(Command command)
+        public void Oneway(Command command)
         {
-			wireformat.Marshal(command, socketWriter);
-			socketWriter.Flush();
+            wireformat.Marshal(command, socketWriter);
+            socketWriter.Flush();
         }
         
         public FutureResponse AsyncRequest(Command command)
@@ -101,9 +91,9 @@
         
         public void Dispose()
         {
-			closed = true;
-			socket.Close();
-			readThread.Join();
+            closed = true;
+            socket.Close();
+            readThread.Join();
             socketWriter.Close();
             socketReader.Close();
         }
@@ -115,36 +105,36 @@
                 try
                 {
                     Command command = (Command) wireformat.Unmarshal(socketReader);
-					this.commandHandler(this, command);
+                    this.commandHandler(this, command);
                 }
-				catch (ObjectDisposedException)
+                catch (ObjectDisposedException)
                 {
                     break;
                 }
-				catch ( Exception e) {
-					if( e.GetBaseException() is ObjectDisposedException ) {
-						break;
-					}
-					if( !closed ) {
-						this.exceptionHandler(this,e);
-					}
-					break;
-				}
+                catch ( Exception e) {
+                    if( e.GetBaseException() is ObjectDisposedException ) {
+                        break;
+                    }
+                    if( !closed ) {
+                        this.exceptionHandler(this,e);
+                    }
+                    break;
+                }
             }
         }
                 
         // Implementation methods
                 
-		public CommandHandler Command {
+        public CommandHandler Command {
             get { return commandHandler; }
             set { this.commandHandler = value; }
         }
-		
+
         public  ExceptionHandler Exception {
             get { return exceptionHandler; }
             set { this.exceptionHandler = value; }
         }
-		
+
     }
 }
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Fri Sep 22 11:11:30 2006
@@ -18,6 +18,8 @@
 using System;
 using System.Net;
 using System.Net.Sockets;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire;
 using ActiveMQ.Transport;
 
 namespace ActiveMQ.Transport.Tcp {
@@ -47,6 +49,8 @@
             }
             rc = new ResponseCorrelator(rc);
             rc = new MutexTransport(rc);
+            rc = new WireFormatNegotiator(rc);
+
             return rc;
         }
 

Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?view=auto&rev=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (added)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Fri Sep 22 11:11:30 2006
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using ActiveMQ.Commands;
+using ActiveMQ.Transport;
+using System;
+
+namespace ActiveMQ.Transport
+{
+	
+    /// <summary>
+    /// A Transport which negotiates the wire format
+    /// </summary>
+    public class WireFormatNegotiator : TransportFilter
+    {
+        public const int OPENWIRE_VERSION = 2;
+
+
+        public WireFormatNegotiator(ITransport next) : base(next) {
+        }
+        
+        public override void Start() {
+            base.Start();
+
+
+            // now lets start the protocol negotiation
+            WireFormatInfo info = new WireFormatInfo();
+            info.StackTraceEnabled=false;
+            info.TightEncodingEnabled=false;
+            info.TcpNoDelayEnabled=false;
+            info.CacheEnabled=false;
+            info.SizePrefixDisabled=false;
+            info.Version = OPENWIRE_VERSION;
+
+            Oneway(info);
+        }
+    }
+}
+

Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/CommonAssemblyInfo.cs Fri Sep 22 11:11:30 2006
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
@@ -38,6 +22,6 @@
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0.2406.0")]
+[assembly: AssemblyVersionAttribute("4.0.2456.0")]
 [assembly: AssemblyInformationalVersionAttribute("4.0")]
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/CommonAssemblyInfo.cs Fri Sep 22 11:11:30 2006
@@ -1,19 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
@@ -38,6 +22,6 @@
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("4.0.2406.0")]
+[assembly: AssemblyVersionAttribute("4.0.2456.0")]
 [assembly: AssemblyInformationalVersionAttribute("4.0")]
 

Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs?view=diff&rev=449031&r1=449030&r2=449031
==============================================================================
--- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs (original)
+++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/JMSTestSupport.cs Fri Sep 22 11:11:30 2006
@@ -24,32 +24,35 @@
 
 namespace NMS
 {
-	[ TestFixture ]
+    [ TestFixture ]
     public abstract class JMSTestSupport
     {
-        
-		private IConnectionFactory factory;
+
+        // enable/disable logging of message flows
+        protected bool logging = false;
+
+        private IConnectionFactory factory;
         private IConnection connection;
-		private ISession session;
-		private IDestination destination;
-		
-		protected int receiveTimeout = 1000;
-		protected string clientId;
-		protected DestinationType destinationType = DestinationType.Queue;
-		protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
-		
-		[SetUp]
+        private ISession session;
+        private IDestination destination;
+
+        protected int receiveTimeout = 1000;
+        protected string clientId;
+        protected DestinationType destinationType = DestinationType.Queue;
+        protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
+
+        [SetUp]
         virtual public void SetUp()
         {
         }
-		
+
         [TearDown]
         virtual public void TearDown()
         {
-			Disconnect();
+            Disconnect();
         }
-		
-		// Properties
+
+        // Properties
         public bool Connected
         {
             get { return connection!=null; }
@@ -59,57 +62,57 @@
         public IConnectionFactory Factory
         {
             get {
-				if( factory == null ) {
-					factory = CreateConnectionFactory();
-					Assert.IsNotNull(factory, "no factory created");
-				}
-				return factory;
-			}
+                if( factory == null ) {
+                    factory = CreateConnectionFactory();
+                    Assert.IsNotNull(factory, "no factory created");
+                }
+                return factory;
+            }
             set { this.factory = value; }
         }
-		
+
         public IConnection Connection
         {
             get {
-				if( connection == null ) {
-					Connect();
-				}
-				return connection;
-			}
+                if( connection == null ) {
+                    Connect();
+                }
+                return connection;
+            }
             set { this.connection = value; }
         }
-		
+
         public ISession Session
         {
             get {
-				if( session == null ) {
-					session = Connection.CreateSession(acknowledgementMode);
-					Assert.IsNotNull(connection != null, "no session created");
-				}
-				return session;
-			}
+                if( session == null ) {
+                    session = Connection.CreateSession(acknowledgementMode);
+                    Assert.IsNotNull(connection != null, "no session created");
+                }
+                return session;
+            }
             set { this.session = value; }
         }
-		
-		virtual protected void Connect()
+
+        virtual protected void Connect()
         {
-			Console.WriteLine("Connectting...");
-			connection = CreateConnection();
-			Assert.IsNotNull(connection, "no connection created");
-			connection.Start();
-			Console.WriteLine("Connected.");
-			Assert.IsNotNull(connection, "no connection created");
+            Console.WriteLine("Connectting...");
+            connection = CreateConnection();
+            Assert.IsNotNull(connection, "no connection created");
+            connection.Start();
+            Console.WriteLine("Connected.");
+            Assert.IsNotNull(connection, "no connection created");
         }
-		        
+
         virtual protected void Disconnect()
         {
             if (connection != null)
             {
-				Console.WriteLine("Disconnecting...");
+                Console.WriteLine("Disconnecting...");
                 connection.Dispose();
                 connection = null;
-				session=null;
-				Console.WriteLine("Disconnected.");
+                session=null;
+                Console.WriteLine("Disconnected.");
             }
         }
         
@@ -118,82 +121,82 @@
             Disconnect();
             Connect();
         }
-		
-		protected virtual void Drain()
-		{
+
+        protected virtual void Drain()
+        {
             using (ISession session = Connection.CreateSession())
             {
-				// Tries to consume any messages on the Destination
-				IMessageConsumer consumer = session.CreateConsumer(Destination);
-				
-				// Should only need to wait for first message to arrive due to the way
-				// prefetching works.
-				IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
-				while (msg != null)
-				{
-					msg = consumer.ReceiveNoWait();
-				}
-			}
-		}
-		
+                // Tries to consume any messages on the Destination
+                IMessageConsumer consumer = session.CreateConsumer(Destination);
+
+                // Should only need to wait for first message to arrive due to the way
+                // prefetching works.
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
+                while (msg != null)
+                {
+                    msg = consumer.ReceiveNoWait();
+                }
+            }
+        }
+
         public virtual void SendAndSyncReceive()
         {
             using (ISession session = Connection.CreateSession())
             {
-				
-				IMessageConsumer consumer = session.CreateConsumer(Destination);
-				IMessageProducer producer = session.CreateProducer(Destination);
-				
-				IMessage request = CreateMessage();
-				producer.Send(request);
-				
-				IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
+
+                IMessageConsumer consumer = session.CreateConsumer(Destination);
+                IMessageProducer producer = session.CreateProducer(Destination);
+
+                IMessage request = CreateMessage();
+                producer.Send(request);
+
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(receiveTimeout));
                 Assert.IsNotNull(message, "No message returned!");
                 AssertValidMessage(message);
             }
         }
-		
-		protected virtual IConnectionFactory CreateConnectionFactory()
-		{
-			return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616?logging=true"));
-		}
-		
-		protected virtual IConnection CreateConnection()
-		{
-			IConnection connection =  Factory.CreateConnection();
-			if( clientId!=null ) {
-				connection.ClientId = clientId;
-			}
-			return connection;
-		}
-		
-		protected virtual IMessageProducer CreateProducer()
-		{
-			IMessageProducer producer = Session.CreateProducer(Destination);
-			return producer;
-		}
-		
-		protected virtual IMessageConsumer CreateConsumer()
-		{
-			IMessageConsumer consumer = Session.CreateConsumer(Destination);
-			return consumer;
-		}
+
+        protected virtual IConnectionFactory CreateConnectionFactory()
+        {
+            return new ActiveMQ.ConnectionFactory(new Uri("tcp://localhost:61616?logging=" + logging));
+        }
+
+        protected virtual IConnection CreateConnection()
+        {
+            IConnection connection =  Factory.CreateConnection();
+            if( clientId!=null ) {
+                connection.ClientId = clientId;
+            }
+            return connection;
+        }
+
+        protected virtual IMessageProducer CreateProducer()
+        {
+            IMessageProducer producer = Session.CreateProducer(Destination);
+            return producer;
+        }
+
+        protected virtual IMessageConsumer CreateConsumer()
+        {
+            IMessageConsumer consumer = Session.CreateConsumer(Destination);
+            return consumer;
+        }
         
         protected virtual IDestination CreateDestination()
         {
-			if( destinationType == DestinationType.Queue ) {
-				return Session.GetQueue(CreateDestinationName());
-			} else if( destinationType == DestinationType.Topic ) {
-				return Session.GetTopic(CreateDestinationName());
-			} else if( destinationType == DestinationType.TemporaryQueue ) {
-				return Session.CreateTemporaryQueue();
-			} else if( destinationType == DestinationType.TemporaryTopic ) {
-				return Session.CreateTemporaryTopic();
-			} else {
-				throw new Exception("Unknown destination type: "+destinationType);
-			}
+            if( destinationType == DestinationType.Queue ) {
+                return Session.GetQueue(CreateDestinationName());
+            } else if( destinationType == DestinationType.Topic ) {
+                return Session.GetTopic(CreateDestinationName());
+            } else if( destinationType == DestinationType.TemporaryQueue ) {
+                return Session.CreateTemporaryQueue();
+            } else if( destinationType == DestinationType.TemporaryTopic ) {
+                return Session.CreateTemporaryTopic();
+            } else {
+                throw new Exception("Unknown destination type: "+destinationType);
+            }
         }
-		
+
         protected virtual string CreateDestinationName()
         {
             return "Test.DotNet." + GetType().Name;
@@ -208,24 +211,24 @@
         {
             Assert.IsNotNull(message, "Null Message!");
         }
-		
-		
+
+
         public IDestination Destination
         {
             get {
-				if (destination == null)
-				{
-					destination = CreateDestination();
-					Assert.IsNotNull(destination, "No destination available!");
-					Console.WriteLine("Using destination: " + destination);
-				}
-				return destination;
-			}
+                if (destination == null)
+                {
+                    destination = CreateDestination();
+                    Assert.IsNotNull(destination, "No destination available!");
+                    Console.WriteLine("Using destination: " + destination);
+                }
+                return destination;
+            }
             set {
-				destination = value;
+                destination = value;
             }
         }
-		
+
     }
 }