You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/21 15:54:17 UTC

svn commit: r892840 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport: InactivityMonitor.cs Tcp/TcpTransportFactory.cs

Author: tabish
Date: Mon Dec 21 14:54:17 2009
New Revision: 892840

URL: http://svn.apache.org/viewvc?rev=892840&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-212

Added user supplied inactivity monitor class with some modification to fix compiler warnings.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=892840&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs Mon Dec 21 14:54:17 2009
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+    /// <summary>
+    /// This class make sure that the connection is still alive,
+    /// by monitoring the reception of commands from the peer of
+    /// the transport.
+    /// </summary>
+    public class InactivityMonitor : TransportFilter
+    {
+        private Atomic<bool> monitorStarted = new Atomic<bool>(false);
+
+        private Atomic<bool> commandSent = new Atomic<bool>(false);
+        private Atomic<bool> commandReceived = new Atomic<bool>(false);
+
+        private Atomic<bool> failed = new Atomic<bool>(false);
+        private Atomic<bool> inRead = new Atomic<bool>(false);
+        private Atomic<bool> inWrite = new Atomic<bool>(false);
+
+        private Mutex monitor = new Mutex();
+
+        private Timer readCheckTimer;
+        private Timer writeCheckTimer;
+
+        private WriteChecker writeChecker;
+        private ReadChecker readChecker;
+
+        private long readCheckTime;
+        private long writeCheckTime;
+        private long initialDelayTime;
+
+        private Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
+        public bool KeepAliveResponseRequired
+        {
+            set { keepAliveResponseRequired.Value = value; }
+        }
+
+        // Local and remote Wire Format Information
+        private WireFormatInfo localWireFormatInfo;
+        private WireFormatInfo remoteWireFormatInfo;
+
+        /// <summary>
+        /// Constructor or the Inactivity Monitor
+        /// </summary>
+        /// <param name="next"></param>
+        public InactivityMonitor(ITransport next)
+            : base(next)
+        {
+            Tracer.Debug("Creating Inactivity Monitor");
+        }
+
+        #region WriteCheck Related
+        /// <summary>
+        /// Check the write to the broker
+        /// </summary>
+        public void WriteCheck()
+        {
+            if (inWrite.Value)
+            {
+                return;
+            }
+            if (!commandSent.Value)
+            {
+                Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+                ThreadPool.QueueUserWorkItem(new WaitCallback(SendKeepAlive));
+            }
+            else
+            {
+                Tracer.Debug("Message sent since last write check. Resetting flag");
+            }
+            commandSent.Value = false;
+        }
+
+        private void SendKeepAlive(object state)
+        {
+            if (monitorStarted.Value)
+            {
+                try
+                {
+                    KeepAliveInfo info = new KeepAliveInfo();
+                    info.ResponseRequired = keepAliveResponseRequired.Value;
+                    Oneway(info);
+                }
+                catch (IOException exception)
+                {
+                    OnException(this, exception);
+                }
+            }
+        }
+        #endregion
+
+        #region ReadCheck Related
+        public void ReadCheck()
+        {
+            if (inRead.Value)
+            {
+                Tracer.Debug("A receive is in progress");
+                return;
+            }
+            if (!commandReceived.Value)
+            {
+                Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+                ThreadPool.QueueUserWorkItem(new WaitCallback(SendInactivityException));
+            }
+            else
+            {
+                commandReceived.Value = true;
+            }
+        }
+
+        private void SendInactivityException(object state)
+        {
+            OnException(this, new IOException("Channel was inactive for too long."));
+        }
+
+        /// <summary>
+        /// Checks if we should allow the read check(if less than 90% of the read
+        /// check time elapsed then we dont do the readcheck
+        /// </summary>
+        /// <param name="elapsed"></param>
+        /// <returns></returns>
+        public bool AllowReadCheck(long elapsed)
+        {
+            return (elapsed > (readCheckTime * 9 / 10));
+        }
+        #endregion
+
+        public override void Stop()
+        {
+            StopMonitorThreads();
+            next.Stop();
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            commandReceived.Value = true;
+            inRead.Value = true;
+            try
+            {
+                if (command is KeepAliveInfo)
+                {
+                    KeepAliveInfo info = command as KeepAliveInfo;
+                    if (info.ResponseRequired)
+                    {
+                        try
+                        {
+                            info.ResponseRequired = false;
+                            Oneway(info);
+                        }
+                        catch (IOException ex)
+                        {
+                            OnException(this, ex);
+                        }
+                    }
+                }
+                else if (command is WireFormatInfo)
+                {
+                    lock (monitor)
+                    {
+                        remoteWireFormatInfo = command as WireFormatInfo;
+                        try
+                        {
+                            StartMonitorThreads();
+                        }
+                        catch (IOException ex)
+                        {
+                            OnException(this, ex);
+                        }
+                    }
+                }
+                base.OnCommand(sender, command);
+            }
+            finally
+            {
+                inRead.Value = false;
+            }
+        }
+
+        public override void Oneway(Command command)
+        {
+            // Disable inactivity monitoring while processing a command.
+            //synchronize this method - its not synchronized
+            //further down the transport stack and gets called by more 
+            //than one thread  by this class
+            lock (inWrite)
+            {
+                inWrite.Value = true;
+                try
+                {
+                    if (failed.Value)
+                    {
+                        throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
+                    }
+                    if (command is WireFormatInfo)
+                    {
+                        lock (monitor)
+                        {
+                            localWireFormatInfo = command as WireFormatInfo;
+                            StartMonitorThreads();
+                        }
+                    }
+                    next.Oneway(command);
+                }
+                finally
+                {
+                    commandSent.Value = true;
+                    inWrite.Value = false;
+                }
+            }
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+            if (failed.CompareAndSet(false, true))
+            {
+                Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+                StopMonitorThreads();
+                base.OnException(sender, command);
+            }
+        }
+
+        private void StartMonitorThreads()
+        {
+            lock (monitor)
+            {
+                if (monitorStarted.Value)
+                {
+                    return;
+                }
+                if (localWireFormatInfo == null)
+                {
+                    return;
+                }
+                if (remoteWireFormatInfo == null)
+                {
+                    return;
+                }
+
+                readCheckTime =
+                    Math.Min(
+                        localWireFormatInfo.MaxInactivityDuration,
+                        remoteWireFormatInfo.MaxInactivityDuration);
+                initialDelayTime =
+                    Math.Min(
+                        localWireFormatInfo.MaxInactivityDurationInitialDelay,
+                        remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+
+                if (readCheckTime > 0)
+                {
+                    monitorStarted.Value = true;
+                    writeChecker = new WriteChecker(this);
+                    readChecker = new ReadChecker(this);
+
+                    writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
+
+                    writeCheckTimer = new Timer(
+                        new TimerCallback(writeChecker.Check),
+                        null,
+                        initialDelayTime,
+                        writeCheckTime
+                        );
+                    readCheckTimer = new Timer(
+                        new TimerCallback(readChecker.Check),
+                        null,
+                        initialDelayTime,
+                        readCheckTime
+                        );
+                }
+            }
+        }
+
+        private void StopMonitorThreads()
+        {
+            lock (monitor)
+            {
+                if (monitorStarted.CompareAndSet(true, false))
+                {
+                    readCheckTimer.Dispose();
+                    writeCheckTimer.Dispose();
+                }
+            }
+        }
+    }
+
+    class WriteChecker
+    {
+        private readonly InactivityMonitor parent;
+
+        public WriteChecker(InactivityMonitor parent)
+        {
+            if (parent == null)
+            {
+                throw new NullReferenceException("WriteChecker created with a NULL parent.");
+            }
+            this.parent = parent;
+        }
+        public void Check(object state)
+        {
+            this.parent.WriteCheck();
+        }
+    }
+
+    class ReadChecker
+    {
+        private readonly InactivityMonitor parent;
+        private long lastRunTime;
+
+        public ReadChecker(InactivityMonitor parent)
+        {
+            if (parent == null)
+            {
+                throw new NullReferenceException("ReadChecker created with a null parent");
+            }
+            this.parent = parent;
+        }
+        public void Check(object state)
+        {
+            long now = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
+            long elapsed = now - lastRunTime;
+            if (!parent.AllowReadCheck(elapsed))
+            {
+                return;
+            }
+            lastRunTime = now;
+
+            // Invoke the parent check routine.
+            this.parent.ReadCheck();
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=892840&r1=892839&r2=892840&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Mon Dec 21 14:54:17 2009
@@ -25,350 +25,365 @@
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
-	public class TcpTransportFactory : ITransportFactory
-	{
-		public TcpTransportFactory()
-		{
-		}
-
-		#region Properties
-
-		private bool useLogging = false;
-		public bool UseLogging
-		{
-			get { return useLogging; }
-			set { useLogging = value; }
-		}
-
-		/// <summary>
-		/// Size in bytes of the receive buffer.
-		/// </summary>
-		private int receiveBufferSize = 8192;
-		public int ReceiveBufferSize
-		{
-			get { return receiveBufferSize; }
-			set { receiveBufferSize = value; }
-		}
-
-		/// <summary>
-		/// Size in bytes of send buffer.
-		/// </summary>
-		private int sendBufferSize = 8192;
-		public int SendBufferSize
-		{
-			get { return sendBufferSize; }
-			set { sendBufferSize = value; }
-		}
-
-		/// <summary>
-		/// The time-out value, in milliseconds. The default value is 0, which indicates
-		/// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
-		/// </summary>
-		private int receiveTimeout = 0;
-		public int ReceiveTimeout
-		{
-			get { return receiveTimeout; }
-			set { receiveTimeout = value; }
-		}
-
-		/// <summary>
-		/// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
-		/// the value will be changed to 500. The default value is 0, which indicates an infinite
-		/// time-out period. Specifying -1 also indicates an infinite time-out period.
-		/// </summary>
-		private int sendTimeout = 0;
-		public int SendTimeout
-		{
-			get { return sendTimeout; }
-			set { sendTimeout = value; }
-		}
-
-		private string wireFormat = "OpenWire";
-		public string WireFormat
-		{
-			get { return wireFormat; }
-			set { wireFormat = value; }
-		}
-
-		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-		public int RequestTimeout
-		{
-			get { return (int) requestTimeout.TotalMilliseconds; }
-			set { requestTimeout = TimeSpan.FromMilliseconds(value); }
-		}
-
-		#endregion
-
-		#region ITransportFactory Members
-
-		public ITransport CompositeConnect(Uri location)
-		{
-			return CompositeConnect(location, null);
-		}
-
-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-		{
-			// Extract query parameters from broker Uri
-			StringDictionary map = URISupport.ParseQuery(location.Query);
+    public class TcpTransportFactory : ITransportFactory
+    {
+        public TcpTransportFactory()
+        {
+        }
+
+        #region Properties
+
+        private bool useLogging = false;
+        public bool UseLogging
+        {
+            get { return useLogging; }
+            set { useLogging = value; }
+        }
+
+        /// <summary>
+        /// Should the Inactivity Monitor be enabled on this Transport.
+        /// </summary>
+        private bool useInactivityMonitor = true;
+        public bool UseInactivityMonitor
+        {
+           get { return this.useInactivityMonitor; }
+           set { this.useInactivityMonitor = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of the receive buffer.
+        /// </summary>
+        private int receiveBufferSize = 8192;
+        public int ReceiveBufferSize
+        {
+            get { return receiveBufferSize; }
+            set { receiveBufferSize = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of send buffer.
+        /// </summary>
+        private int sendBufferSize = 8192;
+        public int SendBufferSize
+        {
+            get { return sendBufferSize; }
+            set { sendBufferSize = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. The default value is 0, which indicates
+        /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int receiveTimeout = 0;
+        public int ReceiveTimeout
+        {
+            get { return receiveTimeout; }
+            set { receiveTimeout = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+        /// the value will be changed to 500. The default value is 0, which indicates an infinite
+        /// time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int sendTimeout = 0;
+        public int SendTimeout
+        {
+            get { return sendTimeout; }
+            set { sendTimeout = value; }
+        }
+
+        private string wireFormat = "OpenWire";
+        public string WireFormat
+        {
+            get { return wireFormat; }
+            set { wireFormat = value; }
+        }
+
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        public int RequestTimeout
+        {
+            get { return (int) requestTimeout.TotalMilliseconds; }
+            set { requestTimeout = TimeSpan.FromMilliseconds(value); }
+        }
+
+        #endregion
+
+        #region ITransportFactory Members
+
+        public ITransport CompositeConnect(Uri location)
+        {
+            return CompositeConnect(location, null);
+        }
+
+        public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+        {
+            // Extract query parameters from broker Uri
+            StringDictionary map = URISupport.ParseQuery(location.Query);
 
-			// Set transport. properties on this (the factory)
-			URISupport.SetProperties(this, map, "transport.");
+            // Set transport. properties on this (the factory)
+            URISupport.SetProperties(this, map, "transport.");
 
-			Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
-			Socket socket = Connect(location.Host, location.Port);
+            Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+            Socket socket = Connect(location.Host, location.Port);
 
 #if !NETCF
-			socket.ReceiveBufferSize = ReceiveBufferSize;
-			socket.SendBufferSize = SendBufferSize;
-			socket.ReceiveTimeout = ReceiveTimeout;
-			socket.SendTimeout = SendTimeout;
+            socket.ReceiveBufferSize = ReceiveBufferSize;
+            socket.SendBufferSize = SendBufferSize;
+            socket.ReceiveTimeout = ReceiveTimeout;
+            socket.SendTimeout = SendTimeout;
 #endif
 
-			IWireFormat wireformat = CreateWireFormat(map);
-			ITransport transport = new TcpTransport(location, socket, wireformat);
+            IWireFormat wireformat = CreateWireFormat(map);
+            ITransport transport = new TcpTransport(location, socket, wireformat);
 
-			wireformat.Transport = transport;
+            wireformat.Transport = transport;
 
-			if(UseLogging)
-			{
-				transport = new LoggingTransport(transport);
-			}
-
-			if(wireformat is OpenWireFormat)
-			{
-				transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
-			}
-
-			transport.RequestTimeout = this.requestTimeout;
-
-			if(setTransport != null)
-			{
-				setTransport(transport, location);
-			}
-
-			return transport;
-		}
-
-		public ITransport CreateTransport(Uri location)
-		{
-			ITransport transport = CompositeConnect(location);
-
-			transport = new MutexTransport(transport);
-			transport = new ResponseCorrelator(transport);
-			transport.RequestTimeout = this.requestTimeout;
-
-			return transport;
-		}
-
-		#endregion
-
-		// DISCUSSION: Caching host entries may not be the best strategy when using the
-		// failover protocol.  The failover protocol needs to be very dynamic when looking
-		// up hostnames at runtime.  If old hostname->IP mappings are kept around, this may
-		// lead to runtime failures that could have been avoided by dynamically looking up
-		// the new hostname IP.
+            if(UseLogging)
+            {
+                transport = new LoggingTransport(transport);
+            }
+
+            if(UseInactivityMonitor)
+            {
+               transport = new InactivityMonitor(transport);
+            }
+
+            if(wireformat is OpenWireFormat)
+            {
+                transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+            }
+
+            transport.RequestTimeout = this.requestTimeout;
+
+            if(setTransport != null)
+            {
+                setTransport(transport, location);
+            }
+
+            return transport;
+        }
+
+        public ITransport CreateTransport(Uri location)
+        {
+            ITransport transport = CompositeConnect(location);
+
+            transport = new MutexTransport(transport);
+            transport = new ResponseCorrelator(transport);
+            transport.RequestTimeout = this.requestTimeout;
+
+            return transport;
+        }
+
+        #endregion
+
+        // DISCUSSION: Caching host entries may not be the best strategy when using the
+        // failover protocol.  The failover protocol needs to be very dynamic when looking
+        // up hostnames at runtime.  If old hostname->IP mappings are kept around, this may
+        // lead to runtime failures that could have been avoided by dynamically looking up
+        // the new hostname IP.
 #if CACHE_HOSTENTRIES
-		private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
-		private static readonly object _syncLock = new object();
+        private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+        private static readonly object _syncLock = new object();
 #endif
-		public static IPHostEntry GetIPHostEntry(string host)
-		{
-			IPHostEntry ipEntry;
+        public static IPHostEntry GetIPHostEntry(string host)
+        {
+            IPHostEntry ipEntry;
 
 #if CACHE_HOSTENTRIES
-			string hostUpperName = host.ToUpper();
+            string hostUpperName = host.ToUpper();
 
-			lock (_syncLock)
-			{
-				if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
-				{
-					try
-					{
-						ipEntry = Dns.GetHostEntry(hostUpperName);
-						CachedIPHostEntries.Add(hostUpperName, ipEntry);
-					}
-					catch
-					{
-						ipEntry = null;
-					}
-				}
-			}
+            lock (_syncLock)
+            {
+                if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+                {
+                    try
+                    {
+                        ipEntry = Dns.GetHostEntry(hostUpperName);
+                        CachedIPHostEntries.Add(hostUpperName, ipEntry);
+                    }
+                    catch
+                    {
+                        ipEntry = null;
+                    }
+                }
+            }
 #else
-			try
-			{
-				ipEntry = Dns.GetHostEntry(host);
-			}
-			catch
-			{
-				ipEntry = null;
-			}
+            try
+            {
+                ipEntry = Dns.GetHostEntry(host);
+            }
+            catch
+            {
+                ipEntry = null;
+            }
 #endif
 
-			return ipEntry;
-		}
+            return ipEntry;
+        }
 
-		private Socket ConnectSocket(IPAddress address, int port)
-		{
-			if(null != address)
-			{
-				try
-				{
-					Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
-
-					if(null != socket)
-					{
-						socket.Connect(new IPEndPoint(address, port));
-						if(socket.Connected)
-						{
-							return socket;
-						}
-					}
-				}
-				catch
-				{
-				}
-			}
+        private Socket ConnectSocket(IPAddress address, int port)
+        {
+            if(null != address)
+            {
+                try
+                {
+                    Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+                    if(null != socket)
+                    {
+                        socket.Connect(new IPEndPoint(address, port));
+                        if(socket.Connected)
+                        {
+                            return socket;
+                        }
+                    }
+                }
+                catch
+                {
+                }
+            }
 
-			return null;
-		}
+            return null;
+        }
 
-		public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
-		{
+        public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+        {
 #if !NETCF
-			return IPAddress.TryParse(host, out ipaddress);
+            return IPAddress.TryParse(host, out ipaddress);
 #else
-			try
-			{
-				ipaddress = IPAddress.Parse(host);
-			}
-			catch
-			{
-				ipaddress = null;
-			}
+            try
+            {
+                ipaddress = IPAddress.Parse(host);
+            }
+            catch
+            {
+                ipaddress = null;
+            }
 
-			return (null != ipaddress);
+            return (null != ipaddress);
 #endif
-		}
+        }
 
-		public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
-		{
-			IPAddress ipaddress = null;
-			IPHostEntry hostEntry = GetIPHostEntry(hostname);
-
-			if(null != hostEntry)
-			{
-				ipaddress = GetIPAddress(hostEntry, addressFamily);
-			}
-
-			return ipaddress;
-		}
-
-		public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
-		{
-			if(null != hostEntry)
-			{
-				foreach(IPAddress address in hostEntry.AddressList)
-				{
-					if(address.AddressFamily == addressFamily)
-					{
-						return address;
-					}
-				}
-			}
-
-			return null;
-		}
-
-		protected Socket Connect(string host, int port)
-		{
-			Socket socket = null;
-			IPAddress ipaddress;
-
-			try
-			{
-				if(TryParseIPAddress(host, out ipaddress))
-				{
-					socket = ConnectSocket(ipaddress, port);
-				}
-				else
-				{
-					// Looping through the AddressList allows different type of connections to be tried
-					// (IPv6, IPv4 and whatever else may be available).
-					IPHostEntry hostEntry = GetIPHostEntry(host);
-
-					if(null != hostEntry)
-					{
-						// Prefer IPv6 first.
-						ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
-						socket = ConnectSocket(ipaddress, port);
-						if(null == socket)
-						{
-							// Try IPv4 next.
-							ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
-							socket = ConnectSocket(ipaddress, port);
-							if(null == socket)
-							{
-								// Try whatever else there is.
-								foreach(IPAddress address in hostEntry.AddressList)
-								{
-									if(AddressFamily.InterNetworkV6 == address.AddressFamily
-										|| AddressFamily.InterNetwork == address.AddressFamily)
-									{
-										// Already tried these protocols.
-										continue;
-									}
-
-									socket = ConnectSocket(address, port);
-									if(null != socket)
-									{
-										ipaddress = address;
-										break;
-									}
-								}
-							}
-						}
-					}
-				}
-
-				if(null == socket)
-				{
-					throw new SocketException();
-				}
-			}
-			catch(Exception ex)
-			{
-				throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
-			}
-
-			Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
-			return socket;
-		}
-
-		protected IWireFormat CreateWireFormat(StringDictionary map)
-		{
-			object properties = null;
-			IWireFormat wireFormatItf = null;
-
-			if(String.Compare(this.wireFormat, "stomp", true) == 0)
-			{
-				wireFormatItf = new StompWireFormat();
-				properties = wireFormatItf;
-			}
-			else
-			{
-				OpenWireFormat openwireFormat = new OpenWireFormat();
-
-				wireFormatItf = openwireFormat;
-				properties = openwireFormat.PreferedWireFormatInfo;
-			}
-
-			if(null != properties)
-			{
-				// Set wireformat. properties on the wireformat owned by the tcpTransport
-				URISupport.SetProperties(properties, map, "wireFormat.");
-			}
-
-			return wireFormatItf;
-		}
-	}
+        public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+        {
+            IPAddress ipaddress = null;
+            IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+            if(null != hostEntry)
+            {
+                ipaddress = GetIPAddress(hostEntry, addressFamily);
+            }
+
+            return ipaddress;
+        }
+
+        public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+        {
+            if(null != hostEntry)
+            {
+                foreach(IPAddress address in hostEntry.AddressList)
+                {
+                    if(address.AddressFamily == addressFamily)
+                    {
+                        return address;
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        protected Socket Connect(string host, int port)
+        {
+            Socket socket = null;
+            IPAddress ipaddress;
+
+            try
+            {
+                if(TryParseIPAddress(host, out ipaddress))
+                {
+                    socket = ConnectSocket(ipaddress, port);
+                }
+                else
+                {
+                    // Looping through the AddressList allows different type of connections to be tried
+                    // (IPv6, IPv4 and whatever else may be available).
+                    IPHostEntry hostEntry = GetIPHostEntry(host);
+
+                    if(null != hostEntry)
+                    {
+                        // Prefer IPv6 first.
+                        ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+                        socket = ConnectSocket(ipaddress, port);
+                        if(null == socket)
+                        {
+                            // Try IPv4 next.
+                            ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+                            socket = ConnectSocket(ipaddress, port);
+                            if(null == socket)
+                            {
+                                // Try whatever else there is.
+                                foreach(IPAddress address in hostEntry.AddressList)
+                                {
+                                    if(AddressFamily.InterNetworkV6 == address.AddressFamily
+                                        || AddressFamily.InterNetwork == address.AddressFamily)
+                                    {
+                                        // Already tried these protocols.
+                                        continue;
+                                    }
+
+                                    socket = ConnectSocket(address, port);
+                                    if(null != socket)
+                                    {
+                                        ipaddress = address;
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if(null == socket)
+                {
+                    throw new SocketException();
+                }
+            }
+            catch(Exception ex)
+            {
+                throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+            }
+
+            Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+            return socket;
+        }
+
+        protected IWireFormat CreateWireFormat(StringDictionary map)
+        {
+            object properties = null;
+            IWireFormat wireFormatItf = null;
+
+            if(String.Compare(this.wireFormat, "stomp", true) == 0)
+            {
+                wireFormatItf = new StompWireFormat();
+                properties = wireFormatItf;
+            }
+            else
+            {
+                OpenWireFormat openwireFormat = new OpenWireFormat();
+
+                wireFormatItf = openwireFormat;
+                properties = openwireFormat.PreferedWireFormatInfo;
+            }
+
+            if(null != properties)
+            {
+                // Set wireformat. properties on the wireformat owned by the tcpTransport
+                URISupport.SetProperties(properties, map, "wireFormat.");
+            }
+
+            return wireFormatItf;
+        }
+    }
 }