You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/21 15:54:17 UTC
svn commit: r892840 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport:
InactivityMonitor.cs Tcp/TcpTransportFactory.cs
Author: tabish
Date: Mon Dec 21 14:54:17 2009
New Revision: 892840
URL: http://svn.apache.org/viewvc?rev=892840&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-212
Added user supplied inactivity monitor class with some modification to fix compiler warnings.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=892840&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs Mon Dec 21 14:54:17 2009
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport
+{
+ /// <summary>
+ /// This class make sure that the connection is still alive,
+ /// by monitoring the reception of commands from the peer of
+ /// the transport.
+ /// </summary>
+ public class InactivityMonitor : TransportFilter
+ {
+ private Atomic<bool> monitorStarted = new Atomic<bool>(false);
+
+ private Atomic<bool> commandSent = new Atomic<bool>(false);
+ private Atomic<bool> commandReceived = new Atomic<bool>(false);
+
+ private Atomic<bool> failed = new Atomic<bool>(false);
+ private Atomic<bool> inRead = new Atomic<bool>(false);
+ private Atomic<bool> inWrite = new Atomic<bool>(false);
+
+ private Mutex monitor = new Mutex();
+
+ private Timer readCheckTimer;
+ private Timer writeCheckTimer;
+
+ private WriteChecker writeChecker;
+ private ReadChecker readChecker;
+
+ private long readCheckTime;
+ private long writeCheckTime;
+ private long initialDelayTime;
+
+ private Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
+ public bool KeepAliveResponseRequired
+ {
+ set { keepAliveResponseRequired.Value = value; }
+ }
+
+ // Local and remote Wire Format Information
+ private WireFormatInfo localWireFormatInfo;
+ private WireFormatInfo remoteWireFormatInfo;
+
+ /// <summary>
+ /// Constructor or the Inactivity Monitor
+ /// </summary>
+ /// <param name="next"></param>
+ public InactivityMonitor(ITransport next)
+ : base(next)
+ {
+ Tracer.Debug("Creating Inactivity Monitor");
+ }
+
+ #region WriteCheck Related
+ /// <summary>
+ /// Check the write to the broker
+ /// </summary>
+ public void WriteCheck()
+ {
+ if (inWrite.Value)
+ {
+ return;
+ }
+ if (!commandSent.Value)
+ {
+ Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+ ThreadPool.QueueUserWorkItem(new WaitCallback(SendKeepAlive));
+ }
+ else
+ {
+ Tracer.Debug("Message sent since last write check. Resetting flag");
+ }
+ commandSent.Value = false;
+ }
+
+ private void SendKeepAlive(object state)
+ {
+ if (monitorStarted.Value)
+ {
+ try
+ {
+ KeepAliveInfo info = new KeepAliveInfo();
+ info.ResponseRequired = keepAliveResponseRequired.Value;
+ Oneway(info);
+ }
+ catch (IOException exception)
+ {
+ OnException(this, exception);
+ }
+ }
+ }
+ #endregion
+
+ #region ReadCheck Related
+ public void ReadCheck()
+ {
+ if (inRead.Value)
+ {
+ Tracer.Debug("A receive is in progress");
+ return;
+ }
+ if (!commandReceived.Value)
+ {
+ Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+ ThreadPool.QueueUserWorkItem(new WaitCallback(SendInactivityException));
+ }
+ else
+ {
+ commandReceived.Value = true;
+ }
+ }
+
+ private void SendInactivityException(object state)
+ {
+ OnException(this, new IOException("Channel was inactive for too long."));
+ }
+
+ /// <summary>
+ /// Checks if we should allow the read check(if less than 90% of the read
+ /// check time elapsed then we dont do the readcheck
+ /// </summary>
+ /// <param name="elapsed"></param>
+ /// <returns></returns>
+ public bool AllowReadCheck(long elapsed)
+ {
+ return (elapsed > (readCheckTime * 9 / 10));
+ }
+ #endregion
+
+ public override void Stop()
+ {
+ StopMonitorThreads();
+ next.Stop();
+ }
+
+ protected override void OnCommand(ITransport sender, Command command)
+ {
+ commandReceived.Value = true;
+ inRead.Value = true;
+ try
+ {
+ if (command is KeepAliveInfo)
+ {
+ KeepAliveInfo info = command as KeepAliveInfo;
+ if (info.ResponseRequired)
+ {
+ try
+ {
+ info.ResponseRequired = false;
+ Oneway(info);
+ }
+ catch (IOException ex)
+ {
+ OnException(this, ex);
+ }
+ }
+ }
+ else if (command is WireFormatInfo)
+ {
+ lock (monitor)
+ {
+ remoteWireFormatInfo = command as WireFormatInfo;
+ try
+ {
+ StartMonitorThreads();
+ }
+ catch (IOException ex)
+ {
+ OnException(this, ex);
+ }
+ }
+ }
+ base.OnCommand(sender, command);
+ }
+ finally
+ {
+ inRead.Value = false;
+ }
+ }
+
+ public override void Oneway(Command command)
+ {
+ // Disable inactivity monitoring while processing a command.
+ //synchronize this method - its not synchronized
+ //further down the transport stack and gets called by more
+ //than one thread by this class
+ lock (inWrite)
+ {
+ inWrite.Value = true;
+ try
+ {
+ if (failed.Value)
+ {
+ throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
+ }
+ if (command is WireFormatInfo)
+ {
+ lock (monitor)
+ {
+ localWireFormatInfo = command as WireFormatInfo;
+ StartMonitorThreads();
+ }
+ }
+ next.Oneway(command);
+ }
+ finally
+ {
+ commandSent.Value = true;
+ inWrite.Value = false;
+ }
+ }
+ }
+
+ protected override void OnException(ITransport sender, Exception command)
+ {
+ if (failed.CompareAndSet(false, true))
+ {
+ Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
+ StopMonitorThreads();
+ base.OnException(sender, command);
+ }
+ }
+
+ private void StartMonitorThreads()
+ {
+ lock (monitor)
+ {
+ if (monitorStarted.Value)
+ {
+ return;
+ }
+ if (localWireFormatInfo == null)
+ {
+ return;
+ }
+ if (remoteWireFormatInfo == null)
+ {
+ return;
+ }
+
+ readCheckTime =
+ Math.Min(
+ localWireFormatInfo.MaxInactivityDuration,
+ remoteWireFormatInfo.MaxInactivityDuration);
+ initialDelayTime =
+ Math.Min(
+ localWireFormatInfo.MaxInactivityDurationInitialDelay,
+ remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+
+ if (readCheckTime > 0)
+ {
+ monitorStarted.Value = true;
+ writeChecker = new WriteChecker(this);
+ readChecker = new ReadChecker(this);
+
+ writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
+
+ writeCheckTimer = new Timer(
+ new TimerCallback(writeChecker.Check),
+ null,
+ initialDelayTime,
+ writeCheckTime
+ );
+ readCheckTimer = new Timer(
+ new TimerCallback(readChecker.Check),
+ null,
+ initialDelayTime,
+ readCheckTime
+ );
+ }
+ }
+ }
+
+ private void StopMonitorThreads()
+ {
+ lock (monitor)
+ {
+ if (monitorStarted.CompareAndSet(true, false))
+ {
+ readCheckTimer.Dispose();
+ writeCheckTimer.Dispose();
+ }
+ }
+ }
+ }
+
+ class WriteChecker
+ {
+ private readonly InactivityMonitor parent;
+
+ public WriteChecker(InactivityMonitor parent)
+ {
+ if (parent == null)
+ {
+ throw new NullReferenceException("WriteChecker created with a NULL parent.");
+ }
+ this.parent = parent;
+ }
+ public void Check(object state)
+ {
+ this.parent.WriteCheck();
+ }
+ }
+
+ class ReadChecker
+ {
+ private readonly InactivityMonitor parent;
+ private long lastRunTime;
+
+ public ReadChecker(InactivityMonitor parent)
+ {
+ if (parent == null)
+ {
+ throw new NullReferenceException("ReadChecker created with a null parent");
+ }
+ this.parent = parent;
+ }
+ public void Check(object state)
+ {
+ long now = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
+ long elapsed = now - lastRunTime;
+ if (!parent.AllowReadCheck(elapsed))
+ {
+ return;
+ }
+ lastRunTime = now;
+
+ // Invoke the parent check routine.
+ this.parent.ReadCheck();
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=892840&r1=892839&r2=892840&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Mon Dec 21 14:54:17 2009
@@ -25,350 +25,365 @@
namespace Apache.NMS.ActiveMQ.Transport.Tcp
{
- public class TcpTransportFactory : ITransportFactory
- {
- public TcpTransportFactory()
- {
- }
-
- #region Properties
-
- private bool useLogging = false;
- public bool UseLogging
- {
- get { return useLogging; }
- set { useLogging = value; }
- }
-
- /// <summary>
- /// Size in bytes of the receive buffer.
- /// </summary>
- private int receiveBufferSize = 8192;
- public int ReceiveBufferSize
- {
- get { return receiveBufferSize; }
- set { receiveBufferSize = value; }
- }
-
- /// <summary>
- /// Size in bytes of send buffer.
- /// </summary>
- private int sendBufferSize = 8192;
- public int SendBufferSize
- {
- get { return sendBufferSize; }
- set { sendBufferSize = value; }
- }
-
- /// <summary>
- /// The time-out value, in milliseconds. The default value is 0, which indicates
- /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
- /// </summary>
- private int receiveTimeout = 0;
- public int ReceiveTimeout
- {
- get { return receiveTimeout; }
- set { receiveTimeout = value; }
- }
-
- /// <summary>
- /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
- /// the value will be changed to 500. The default value is 0, which indicates an infinite
- /// time-out period. Specifying -1 also indicates an infinite time-out period.
- /// </summary>
- private int sendTimeout = 0;
- public int SendTimeout
- {
- get { return sendTimeout; }
- set { sendTimeout = value; }
- }
-
- private string wireFormat = "OpenWire";
- public string WireFormat
- {
- get { return wireFormat; }
- set { wireFormat = value; }
- }
-
- private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
- public int RequestTimeout
- {
- get { return (int) requestTimeout.TotalMilliseconds; }
- set { requestTimeout = TimeSpan.FromMilliseconds(value); }
- }
-
- #endregion
-
- #region ITransportFactory Members
-
- public ITransport CompositeConnect(Uri location)
- {
- return CompositeConnect(location, null);
- }
-
- public ITransport CompositeConnect(Uri location, SetTransport setTransport)
- {
- // Extract query parameters from broker Uri
- StringDictionary map = URISupport.ParseQuery(location.Query);
+ public class TcpTransportFactory : ITransportFactory
+ {
+ public TcpTransportFactory()
+ {
+ }
+
+ #region Properties
+
+ private bool useLogging = false;
+ public bool UseLogging
+ {
+ get { return useLogging; }
+ set { useLogging = value; }
+ }
+
+ /// <summary>
+ /// Should the Inactivity Monitor be enabled on this Transport.
+ /// </summary>
+ private bool useInactivityMonitor = true;
+ public bool UseInactivityMonitor
+ {
+ get { return this.useInactivityMonitor; }
+ set { this.useInactivityMonitor = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of the receive buffer.
+ /// </summary>
+ private int receiveBufferSize = 8192;
+ public int ReceiveBufferSize
+ {
+ get { return receiveBufferSize; }
+ set { receiveBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of send buffer.
+ /// </summary>
+ private int sendBufferSize = 8192;
+ public int SendBufferSize
+ {
+ get { return sendBufferSize; }
+ set { sendBufferSize = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. The default value is 0, which indicates
+ /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int receiveTimeout = 0;
+ public int ReceiveTimeout
+ {
+ get { return receiveTimeout; }
+ set { receiveTimeout = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+ /// the value will be changed to 500. The default value is 0, which indicates an infinite
+ /// time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int sendTimeout = 0;
+ public int SendTimeout
+ {
+ get { return sendTimeout; }
+ set { sendTimeout = value; }
+ }
+
+ private string wireFormat = "OpenWire";
+ public string WireFormat
+ {
+ get { return wireFormat; }
+ set { wireFormat = value; }
+ }
+
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+ public int RequestTimeout
+ {
+ get { return (int) requestTimeout.TotalMilliseconds; }
+ set { requestTimeout = TimeSpan.FromMilliseconds(value); }
+ }
+
+ #endregion
+
+ #region ITransportFactory Members
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ return CompositeConnect(location, null);
+ }
+
+ public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+ {
+ // Extract query parameters from broker Uri
+ StringDictionary map = URISupport.ParseQuery(location.Query);
- // Set transport. properties on this (the factory)
- URISupport.SetProperties(this, map, "transport.");
+ // Set transport. properties on this (the factory)
+ URISupport.SetProperties(this, map, "transport.");
- Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
- Socket socket = Connect(location.Host, location.Port);
+ Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+ Socket socket = Connect(location.Host, location.Port);
#if !NETCF
- socket.ReceiveBufferSize = ReceiveBufferSize;
- socket.SendBufferSize = SendBufferSize;
- socket.ReceiveTimeout = ReceiveTimeout;
- socket.SendTimeout = SendTimeout;
+ socket.ReceiveBufferSize = ReceiveBufferSize;
+ socket.SendBufferSize = SendBufferSize;
+ socket.ReceiveTimeout = ReceiveTimeout;
+ socket.SendTimeout = SendTimeout;
#endif
- IWireFormat wireformat = CreateWireFormat(map);
- ITransport transport = new TcpTransport(location, socket, wireformat);
+ IWireFormat wireformat = CreateWireFormat(map);
+ ITransport transport = new TcpTransport(location, socket, wireformat);
- wireformat.Transport = transport;
+ wireformat.Transport = transport;
- if(UseLogging)
- {
- transport = new LoggingTransport(transport);
- }
-
- if(wireformat is OpenWireFormat)
- {
- transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
- }
-
- transport.RequestTimeout = this.requestTimeout;
-
- if(setTransport != null)
- {
- setTransport(transport, location);
- }
-
- return transport;
- }
-
- public ITransport CreateTransport(Uri location)
- {
- ITransport transport = CompositeConnect(location);
-
- transport = new MutexTransport(transport);
- transport = new ResponseCorrelator(transport);
- transport.RequestTimeout = this.requestTimeout;
-
- return transport;
- }
-
- #endregion
-
- // DISCUSSION: Caching host entries may not be the best strategy when using the
- // failover protocol. The failover protocol needs to be very dynamic when looking
- // up hostnames at runtime. If old hostname->IP mappings are kept around, this may
- // lead to runtime failures that could have been avoided by dynamically looking up
- // the new hostname IP.
+ if(UseLogging)
+ {
+ transport = new LoggingTransport(transport);
+ }
+
+ if(UseInactivityMonitor)
+ {
+ transport = new InactivityMonitor(transport);
+ }
+
+ if(wireformat is OpenWireFormat)
+ {
+ transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+ }
+
+ transport.RequestTimeout = this.requestTimeout;
+
+ if(setTransport != null)
+ {
+ setTransport(transport, location);
+ }
+
+ return transport;
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ ITransport transport = CompositeConnect(location);
+
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ transport.RequestTimeout = this.requestTimeout;
+
+ return transport;
+ }
+
+ #endregion
+
+ // DISCUSSION: Caching host entries may not be the best strategy when using the
+ // failover protocol. The failover protocol needs to be very dynamic when looking
+ // up hostnames at runtime. If old hostname->IP mappings are kept around, this may
+ // lead to runtime failures that could have been avoided by dynamically looking up
+ // the new hostname IP.
#if CACHE_HOSTENTRIES
- private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
- private static readonly object _syncLock = new object();
+ private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+ private static readonly object _syncLock = new object();
#endif
- public static IPHostEntry GetIPHostEntry(string host)
- {
- IPHostEntry ipEntry;
+ public static IPHostEntry GetIPHostEntry(string host)
+ {
+ IPHostEntry ipEntry;
#if CACHE_HOSTENTRIES
- string hostUpperName = host.ToUpper();
+ string hostUpperName = host.ToUpper();
- lock (_syncLock)
- {
- if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
- {
- try
- {
- ipEntry = Dns.GetHostEntry(hostUpperName);
- CachedIPHostEntries.Add(hostUpperName, ipEntry);
- }
- catch
- {
- ipEntry = null;
- }
- }
- }
+ lock (_syncLock)
+ {
+ if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+ {
+ try
+ {
+ ipEntry = Dns.GetHostEntry(hostUpperName);
+ CachedIPHostEntries.Add(hostUpperName, ipEntry);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
+ }
+ }
#else
- try
- {
- ipEntry = Dns.GetHostEntry(host);
- }
- catch
- {
- ipEntry = null;
- }
+ try
+ {
+ ipEntry = Dns.GetHostEntry(host);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
#endif
- return ipEntry;
- }
+ return ipEntry;
+ }
- private Socket ConnectSocket(IPAddress address, int port)
- {
- if(null != address)
- {
- try
- {
- Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
-
- if(null != socket)
- {
- socket.Connect(new IPEndPoint(address, port));
- if(socket.Connected)
- {
- return socket;
- }
- }
- }
- catch
- {
- }
- }
+ private Socket ConnectSocket(IPAddress address, int port)
+ {
+ if(null != address)
+ {
+ try
+ {
+ Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+ if(null != socket)
+ {
+ socket.Connect(new IPEndPoint(address, port));
+ if(socket.Connected)
+ {
+ return socket;
+ }
+ }
+ }
+ catch
+ {
+ }
+ }
- return null;
- }
+ return null;
+ }
- public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
- {
+ public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+ {
#if !NETCF
- return IPAddress.TryParse(host, out ipaddress);
+ return IPAddress.TryParse(host, out ipaddress);
#else
- try
- {
- ipaddress = IPAddress.Parse(host);
- }
- catch
- {
- ipaddress = null;
- }
+ try
+ {
+ ipaddress = IPAddress.Parse(host);
+ }
+ catch
+ {
+ ipaddress = null;
+ }
- return (null != ipaddress);
+ return (null != ipaddress);
#endif
- }
+ }
- public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
- {
- IPAddress ipaddress = null;
- IPHostEntry hostEntry = GetIPHostEntry(hostname);
-
- if(null != hostEntry)
- {
- ipaddress = GetIPAddress(hostEntry, addressFamily);
- }
-
- return ipaddress;
- }
-
- public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
- {
- if(null != hostEntry)
- {
- foreach(IPAddress address in hostEntry.AddressList)
- {
- if(address.AddressFamily == addressFamily)
- {
- return address;
- }
- }
- }
-
- return null;
- }
-
- protected Socket Connect(string host, int port)
- {
- Socket socket = null;
- IPAddress ipaddress;
-
- try
- {
- if(TryParseIPAddress(host, out ipaddress))
- {
- socket = ConnectSocket(ipaddress, port);
- }
- else
- {
- // Looping through the AddressList allows different type of connections to be tried
- // (IPv6, IPv4 and whatever else may be available).
- IPHostEntry hostEntry = GetIPHostEntry(host);
-
- if(null != hostEntry)
- {
- // Prefer IPv6 first.
- ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
- socket = ConnectSocket(ipaddress, port);
- if(null == socket)
- {
- // Try IPv4 next.
- ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
- socket = ConnectSocket(ipaddress, port);
- if(null == socket)
- {
- // Try whatever else there is.
- foreach(IPAddress address in hostEntry.AddressList)
- {
- if(AddressFamily.InterNetworkV6 == address.AddressFamily
- || AddressFamily.InterNetwork == address.AddressFamily)
- {
- // Already tried these protocols.
- continue;
- }
-
- socket = ConnectSocket(address, port);
- if(null != socket)
- {
- ipaddress = address;
- break;
- }
- }
- }
- }
- }
- }
-
- if(null == socket)
- {
- throw new SocketException();
- }
- }
- catch(Exception ex)
- {
- throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
- }
-
- Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
- return socket;
- }
-
- protected IWireFormat CreateWireFormat(StringDictionary map)
- {
- object properties = null;
- IWireFormat wireFormatItf = null;
-
- if(String.Compare(this.wireFormat, "stomp", true) == 0)
- {
- wireFormatItf = new StompWireFormat();
- properties = wireFormatItf;
- }
- else
- {
- OpenWireFormat openwireFormat = new OpenWireFormat();
-
- wireFormatItf = openwireFormat;
- properties = openwireFormat.PreferedWireFormatInfo;
- }
-
- if(null != properties)
- {
- // Set wireformat. properties on the wireformat owned by the tcpTransport
- URISupport.SetProperties(properties, map, "wireFormat.");
- }
-
- return wireFormatItf;
- }
- }
+ public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+ {
+ IPAddress ipaddress = null;
+ IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+ if(null != hostEntry)
+ {
+ ipaddress = GetIPAddress(hostEntry, addressFamily);
+ }
+
+ return ipaddress;
+ }
+
+ public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+ {
+ if(null != hostEntry)
+ {
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(address.AddressFamily == addressFamily)
+ {
+ return address;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ protected Socket Connect(string host, int port)
+ {
+ Socket socket = null;
+ IPAddress ipaddress;
+
+ try
+ {
+ if(TryParseIPAddress(host, out ipaddress))
+ {
+ socket = ConnectSocket(ipaddress, port);
+ }
+ else
+ {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv6, IPv4 and whatever else may be available).
+ IPHostEntry hostEntry = GetIPHostEntry(host);
+
+ if(null != hostEntry)
+ {
+ // Prefer IPv6 first.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+ socket = ConnectSocket(ipaddress, port);
+ if(null == socket)
+ {
+ // Try IPv4 next.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+ socket = ConnectSocket(ipaddress, port);
+ if(null == socket)
+ {
+ // Try whatever else there is.
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(AddressFamily.InterNetworkV6 == address.AddressFamily
+ || AddressFamily.InterNetwork == address.AddressFamily)
+ {
+ // Already tried these protocols.
+ continue;
+ }
+
+ socket = ConnectSocket(address, port);
+ if(null != socket)
+ {
+ ipaddress = address;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if(null == socket)
+ {
+ throw new SocketException();
+ }
+ }
+ catch(Exception ex)
+ {
+ throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+ }
+
+ Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+ return socket;
+ }
+
+ protected IWireFormat CreateWireFormat(StringDictionary map)
+ {
+ object properties = null;
+ IWireFormat wireFormatItf = null;
+
+ if(String.Compare(this.wireFormat, "stomp", true) == 0)
+ {
+ wireFormatItf = new StompWireFormat();
+ properties = wireFormatItf;
+ }
+ else
+ {
+ OpenWireFormat openwireFormat = new OpenWireFormat();
+
+ wireFormatItf = openwireFormat;
+ properties = openwireFormat.PreferedWireFormatInfo;
+ }
+
+ if(null != properties)
+ {
+ // Set wireformat. properties on the wireformat owned by the tcpTransport
+ URISupport.SetProperties(properties, map, "wireFormat.");
+ }
+
+ return wireFormatItf;
+ }
+ }
}