You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/04 00:11:06 UTC

svn commit: r886978 [2/2] - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./ Commands/ Protocol/ Transport/ Transport/Tcp/ Util/

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Tcp
+{
+    /// <summary>
+    /// An implementation of ITransport that uses sockets to communicate with the broker
+    /// </summary>
+    public class TcpTransport : ITransport
+    {
+        private readonly object myLock = new object();
+        private readonly Socket socket;
+        private IWireFormat wireformat;
+        private BinaryReader socketReader;
+        private BinaryWriter socketWriter;
+        private Thread readThread;
+        private bool started;
+        private bool disposed = false;
+        private Atomic<bool> closed = new Atomic<bool>(false);
+        private volatile bool seenShutdown;
+        private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+        private Uri connectedUri;
+
+        private CommandHandler commandHandler;
+        private ExceptionHandler exceptionHandler;
+        private InterruptedHandler interruptedHandler;
+        private ResumedHandler resumedHandler;
+        private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+        public TcpTransport(Uri uri, Socket socket, IWireFormat wireformat)
+        {
+            this.connectedUri = uri;
+            this.socket = socket;
+            this.wireformat = wireformat;
+        }
+
+        ~TcpTransport()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// Method Start
+        /// </summary>
+        public void Start()
+        {
+            lock(myLock)
+            {
+                if(!started)
+                {
+                    if(null == commandHandler)
+                    {
+                        throw new InvalidOperationException(
+                                "command cannot be null when Start is called.");
+                    }
+
+                    if(null == exceptionHandler)
+                    {
+                        throw new InvalidOperationException(
+                                "exception cannot be null when Start is called.");
+                    }
+
+                    started = true;
+
+                    // As reported in AMQ-988 it appears that NetworkStream is not thread safe
+                    // so lets use an instance for each of the 2 streams
+                    socketWriter = new EndianBinaryWriter(new NetworkStream(socket));
+                    socketReader = new EndianBinaryReader(new NetworkStream(socket));
+
+                    // now lets create the background read thread
+                    readThread = new Thread(new ThreadStart(ReadLoop));
+                    readThread.IsBackground = true;
+                    readThread.Start();
+                }
+            }
+        }
+
+        /// <summary>
+        /// Property IsStarted
+        /// </summary>
+        public bool IsStarted
+        {
+            get
+            {
+                lock(myLock)
+                {
+                    return started;
+                }
+            }
+        }
+
+        public void Oneway(Command command)
+        {
+            lock(myLock)
+            {
+                if(closed.Value)
+                {
+                    throw new InvalidOperationException("Error writing to broker.  Transport connection is closed.");
+                }
+
+                if(command is ShutdownInfo)
+                {
+                    seenShutdown = true;
+                }
+
+                Wireformat.Marshal(command, socketWriter);
+            }
+        }
+
+        public FutureResponse AsyncRequest(Command command)
+        {
+            throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
+        }
+
+        /// <summary>
+        /// Property RequestTimeout
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return this.maxWait; }
+            set { this.maxWait = value; }
+        }
+
+        public bool TcpNoDelayEnabled
+        {
+#if !NETCF
+            get { return this.socket.NoDelay; }
+            set { this.socket.NoDelay = value; }
+#else
+            get { return false; }
+            set { }
+#endif
+        }
+
+        public Response Request(Command command)
+        {
+            throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+        }
+
+        public Response Request(Command command, TimeSpan timeout)
+        {
+            throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+        }
+
+        public void Stop()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            if(closed.CompareAndSet(false, true))
+            {
+                lock(myLock)
+                {
+                    try
+                    {
+                        socket.Shutdown(SocketShutdown.Both);
+                    }
+                    catch
+                    {
+                    }
+
+                    try
+                    {
+                        if(null != socketWriter)
+                        {
+                            socketWriter.Close();
+                        }
+                    }
+                    catch
+                    {
+                    }
+                    finally
+                    {
+                        socketWriter = null;
+                    }
+
+                    try
+                    {
+                        if(null != socketReader)
+                        {
+                            socketReader.Close();
+                        }
+                    }
+                    catch
+                    {
+                    }
+                    finally
+                    {
+                        socketReader = null;
+                    }
+
+                    try
+                    {
+                        socket.Close();
+                    }
+                    catch
+                    {
+                    }
+
+                    if(null != readThread)
+                    {
+                        if(Thread.CurrentThread != readThread
+#if !NETCF
+ && readThread.IsAlive
+#endif
+)
+                        {
+                            TimeSpan waitTime;
+
+                            if(maxWait < MAX_THREAD_WAIT)
+                            {
+                                waitTime = maxWait;
+                            }
+                            else
+                            {
+                                waitTime = MAX_THREAD_WAIT;
+                            }
+
+                            if(!readThread.Join((int) waitTime.TotalMilliseconds))
+                            {
+                                readThread.Abort();
+                            }
+                        }
+
+                        readThread = null;
+                    }
+
+                    started = false;
+                }
+            }
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            Close();
+            disposed = true;
+        }
+
+        public bool IsDisposed
+        {
+            get
+            {
+                return disposed;
+            }
+        }
+
+        public void ReadLoop()
+        {
+            // This is the thread function for the reader thread. This runs continuously
+            // performing a blokcing read on the socket and dispatching all commands
+            // received.
+            //
+            // Exception Handling
+            // ------------------
+            // If an Exception occurs during the reading/marshalling, then the connection
+            // is effectively broken because position cannot be re-established to the next
+            // message.  This is reported to the app via the exceptionHandler and the socket
+            // is closed to prevent further communication attempts.
+            //
+            // An exception in the command handler may not be fatal to the transport, so
+            // these are simply reported to the exceptionHandler.
+            //
+            while(!closed.Value)
+            {
+                Command command = null;
+
+                try
+                {
+                    command = (Command) Wireformat.Unmarshal(socketReader);
+                }
+                catch(Exception ex)
+                {
+                    command = null;
+                    if(!closed.Value)
+                    {
+                        // Close the socket as there's little that can be done with this transport now.
+                        Close();
+                        if(!seenShutdown)
+                        {
+                            this.exceptionHandler(this, ex);
+                        }
+                    }
+
+                    break;
+                }
+
+                try
+                {
+                    if(command != null)
+                    {
+                        this.commandHandler(this, command);
+                    }
+                }
+                catch(Exception e)
+                {
+                    this.exceptionHandler(this, e);
+                }
+            }
+        }
+
+        // Implementation methods
+
+        public CommandHandler Command
+        {
+            get { return commandHandler; }
+            set { this.commandHandler = value; }
+        }
+
+        public ExceptionHandler Exception
+        {
+            get { return exceptionHandler; }
+            set { this.exceptionHandler = value; }
+        }
+
+        public InterruptedHandler Interrupted
+        {
+            get { return interruptedHandler; }
+            set { this.interruptedHandler = value; }
+        }
+
+        public ResumedHandler Resumed
+        {
+            get { return resumedHandler; }
+            set { this.resumedHandler = value; }
+        }
+
+        public IWireFormat Wireformat
+        {
+            get { return wireformat; }
+            set { wireformat = value; }
+        }
+
+        public bool IsFaultTolerant
+        {
+            get { return false; }
+        }
+
+        public bool IsConnected
+        {
+            get { return socket.Connected; }
+        }
+
+        public Uri RemoteAddress
+        {
+            get { return connectedUri; }
+        }
+
+        public Object Narrow(Type type)
+        {
+            if(this.GetType().Equals(type))
+            {
+                return this;
+            }
+
+            return null;
+        }
+
+    }
+}
+
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using System.Net;
+using System.Net.Sockets;
+using Apache.NMS.Stomp.Transport.Stomp;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Tcp
+{
+    public class TcpTransportFactory : ITransportFactory
+    {
+        public TcpTransportFactory()
+        {
+        }
+
+        #region Properties
+
+        private bool useLogging = false;
+        public bool UseLogging
+        {
+            get { return useLogging; }
+            set { useLogging = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of the receive buffer.
+        /// </summary>
+        private int receiveBufferSize = 8192;
+        public int ReceiveBufferSize
+        {
+            get { return receiveBufferSize; }
+            set { receiveBufferSize = value; }
+        }
+
+        /// <summary>
+        /// Size in bytes of send buffer.
+        /// </summary>
+        private int sendBufferSize = 8192;
+        public int SendBufferSize
+        {
+            get { return sendBufferSize; }
+            set { sendBufferSize = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. The default value is 0, which indicates
+        /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int receiveTimeout = 0;
+        public int ReceiveTimeout
+        {
+            get { return receiveTimeout; }
+            set { receiveTimeout = value; }
+        }
+
+        /// <summary>
+        /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+        /// the value will be changed to 500. The default value is 0, which indicates an infinite
+        /// time-out period. Specifying -1 also indicates an infinite time-out period.
+        /// </summary>
+        private int sendTimeout = 0;
+        public int SendTimeout
+        {
+            get { return sendTimeout; }
+            set { sendTimeout = value; }
+        }
+
+        private string wireFormat = "Stomp";
+        public string WireFormat
+        {
+            get { return wireFormat; }
+            set { wireFormat = value; }
+        }
+
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        public int RequestTimeout
+        {
+            get { return (int) requestTimeout.TotalMilliseconds; }
+            set { requestTimeout = TimeSpan.FromMilliseconds(value); }
+        }
+
+        #endregion
+
+        #region ITransportFactory Members
+
+        public ITransport CompositeConnect(Uri location)
+        {
+            return CompositeConnect(location, null);
+        }
+
+        public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+        {
+            // Extract query parameters from broker Uri
+            StringDictionary map = URISupport.ParseQuery(location.Query);
+
+            // Set transport. properties on this (the factory)
+            URISupport.SetProperties(this, map, "transport.");
+
+            Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+            Socket socket = Connect(location.Host, location.Port);
+
+#if !NETCF
+            socket.ReceiveBufferSize = ReceiveBufferSize;
+            socket.SendBufferSize = SendBufferSize;
+            socket.ReceiveTimeout = ReceiveTimeout;
+            socket.SendTimeout = SendTimeout;
+#endif
+
+            IWireFormat wireformat = CreateWireFormat(map);
+            ITransport transport = new TcpTransport(location, socket, wireformat);
+
+            wireformat.Transport = transport;
+
+            if(UseLogging)
+            {
+                transport = new LoggingTransport(transport);
+            }
+
+            if(wireformat is OpenWireFormat)
+            {
+                transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+            }
+
+            transport.RequestTimeout = this.requestTimeout;
+
+            if(setTransport != null)
+            {
+                setTransport(transport, location);
+            }
+
+            return transport;
+        }
+
+        public ITransport CreateTransport(Uri location)
+        {
+            ITransport transport = CompositeConnect(location);
+
+            transport = new MutexTransport(transport);
+            transport = new ResponseCorrelator(transport);
+            transport.RequestTimeout = this.requestTimeout;
+
+            return transport;
+        }
+
+        #endregion
+
+        // DISCUSSION: Caching host entries may not be the best strategy when using the
+        // failover protocol.  The failover protocol needs to be very dynamic when looking
+        // up hostnames at runtime.  If old hostname->IP mappings are kept around, this may
+        // lead to runtime failures that could have been avoided by dynamically looking up
+        // the new hostname IP.
+#if CACHE_HOSTENTRIES
+        private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+        private static readonly object _syncLock = new object();
+#endif
+        public static IPHostEntry GetIPHostEntry(string host)
+        {
+            IPHostEntry ipEntry;
+
+#if CACHE_HOSTENTRIES
+            string hostUpperName = host.ToUpper();
+
+            lock (_syncLock)
+            {
+                if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+                {
+                    try
+                    {
+                        ipEntry = Dns.GetHostEntry(hostUpperName);
+                        CachedIPHostEntries.Add(hostUpperName, ipEntry);
+                    }
+                    catch
+                    {
+                        ipEntry = null;
+                    }
+                }
+            }
+#else
+            try
+            {
+                ipEntry = Dns.GetHostEntry(host);
+            }
+            catch
+            {
+                ipEntry = null;
+            }
+#endif
+
+            return ipEntry;
+        }
+
+        private Socket ConnectSocket(IPAddress address, int port)
+        {
+            if(null != address)
+            {
+                try
+                {
+                    Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+                    if(null != socket)
+                    {
+                        socket.Connect(new IPEndPoint(address, port));
+                        if(socket.Connected)
+                        {
+                            return socket;
+                        }
+                    }
+                }
+                catch
+                {
+                }
+            }
+
+            return null;
+        }
+
+        public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+        {
+#if !NETCF
+            return IPAddress.TryParse(host, out ipaddress);
+#else
+            try
+            {
+                ipaddress = IPAddress.Parse(host);
+            }
+            catch
+            {
+                ipaddress = null;
+            }
+
+            return (null != ipaddress);
+#endif
+        }
+
+        public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+        {
+            IPAddress ipaddress = null;
+            IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+            if(null != hostEntry)
+            {
+                ipaddress = GetIPAddress(hostEntry, addressFamily);
+            }
+
+            return ipaddress;
+        }
+
+        public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+        {
+            if(null != hostEntry)
+            {
+                foreach(IPAddress address in hostEntry.AddressList)
+                {
+                    if(address.AddressFamily == addressFamily)
+                    {
+                        return address;
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        protected Socket Connect(string host, int port)
+        {
+            Socket socket = null;
+            IPAddress ipaddress;
+
+            try
+            {
+                if(TryParseIPAddress(host, out ipaddress))
+                {
+                    socket = ConnectSocket(ipaddress, port);
+                }
+                else
+                {
+                    // Looping through the AddressList allows different type of connections to be tried
+                    // (IPv6, IPv4 and whatever else may be available).
+                    IPHostEntry hostEntry = GetIPHostEntry(host);
+
+                    if(null != hostEntry)
+                    {
+                        // Prefer IPv6 first.
+                        ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+                        socket = ConnectSocket(ipaddress, port);
+                        if(null == socket)
+                        {
+                            // Try IPv4 next.
+                            ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+                            socket = ConnectSocket(ipaddress, port);
+                            if(null == socket)
+                            {
+                                // Try whatever else there is.
+                                foreach(IPAddress address in hostEntry.AddressList)
+                                {
+                                    if(AddressFamily.InterNetworkV6 == address.AddressFamily
+                                        || AddressFamily.InterNetwork == address.AddressFamily)
+                                    {
+                                        // Already tried these protocols.
+                                        continue;
+                                    }
+
+                                    socket = ConnectSocket(address, port);
+                                    if(null != socket)
+                                    {
+                                        ipaddress = address;
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if(null == socket)
+                {
+                    throw new SocketException();
+                }
+            }
+            catch(Exception ex)
+            {
+                throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+            }
+
+            Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+            return socket;
+        }
+
+        protected IWireFormat CreateWireFormat(StringDictionary map)
+        {
+            object properties = null;
+            IWireFormat wireFormatItf = null;
+
+            wireFormatItf = new StompWireFormat();
+            properties = wireFormatItf;
+
+            if(null != properties)
+            {
+                // Set wireformat. properties on the wireformat owned by the tcpTransport
+                URISupport.SetProperties(properties, map, "wireFormat.");
+            }
+
+            return wireFormatItf;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Transport.Discovery;
+using Apache.NMS.Stomp.Transport.Failover;
+using Apache.NMS.Stomp.Transport.Mock;
+using Apache.NMS.Stomp.Transport.Tcp;
+
+namespace Apache.NMS.Stomp.Transport
+{
+    public class TransportFactory
+    {
+        public static event ExceptionListener OnException;
+
+        public static void HandleException(Exception ex)
+        {
+            if(TransportFactory.OnException != null)
+            {
+                TransportFactory.OnException(ex);
+            }
+        }
+
+        /// <summary>
+        /// Creates a normal transport.
+        /// </summary>
+        /// <param name="location"></param>
+        /// <returns>the transport</returns>
+        public static ITransport CreateTransport(Uri location)
+        {
+            ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+            return tf.CreateTransport(location);
+        }
+
+        public static ITransport CompositeConnect(Uri location)
+        {
+            ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+            return tf.CompositeConnect(location);
+        }
+
+        public static ITransport AsyncCompositeConnect(Uri location, SetTransport setTransport)
+        {
+            ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+            return tf.CompositeConnect(location, setTransport);
+        }
+
+        /// <summary>
+        /// Create a transport factory for the scheme.  If we do not support the transport protocol,
+        /// an NMSConnectionException will be thrown.
+        /// </summary>
+        /// <param name="location"></param>
+        /// <returns></returns>
+        private static ITransportFactory CreateTransportFactory(Uri location)
+        {
+            string scheme = location.Scheme;
+
+            if(null == scheme || 0 == scheme.Length)
+            {
+                throw new NMSConnectionException(String.Format("Transport scheme invalid: [{0}]", location.ToString()));
+            }
+
+            ITransportFactory factory = null;
+
+            try
+            {
+                switch(scheme.ToLower())
+                {
+                case "tcp":
+                    factory = new TcpTransportFactory();
+                    break;
+                default:
+                    throw new NMSConnectionException(String.Format("The transport {0} is not supported.", scheme));
+                }
+            }
+            catch(NMSConnectionException)
+            {
+                throw;
+            }
+            catch
+            {
+                throw new NMSConnectionException("Error creating transport.");
+            }
+
+            if(null == factory)
+            {
+                throw new NMSConnectionException("Unable to create a transport.");
+            }
+
+            return factory;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.Stomp.Transport
+{
+	/// <summary>
+	/// Used to implement a filter on the transport layer.
+	/// </summary>
+	public class TransportFilter : ITransport
+	{
+		protected readonly ITransport next;
+		protected CommandHandler commandHandler;
+		protected ExceptionHandler exceptionHandler;
+		protected InterruptedHandler interruptedHandler;
+		protected ResumedHandler resumedHandler;
+		private bool disposed = false;
+
+		public TransportFilter(ITransport next)
+		{
+			this.next = next;
+			this.next.Command = new CommandHandler(OnCommand);
+			this.next.Exception = new ExceptionHandler(OnException);
+            this.next.Interrupted = new InterruptedHandler(OnInterrupted);
+            this.next.Resumed = new ResumedHandler(OnResumed);
+		}
+
+		~TransportFilter()
+		{
+			Dispose(false);
+		}
+
+		protected virtual void OnCommand(ITransport sender, Command command)
+		{
+			this.commandHandler(sender, command);
+		}
+
+		protected virtual void OnException(ITransport sender, Exception command)
+		{
+			this.exceptionHandler(sender, command);
+		}
+
+        protected virtual void OnInterrupted(ITransport sender)
+        {
+            if(this.interruptedHandler != null)
+            {
+                this.interruptedHandler(sender);
+            }
+        }
+
+        protected virtual void OnResumed(ITransport sender)
+        {
+            if(this.resumedHandler != null)
+            {
+                this.resumedHandler(sender);
+            }
+        }
+        
+		/// <summary>
+		/// Method Oneway
+		/// </summary>
+		/// <param name="command">A  Command</param>
+		public virtual void Oneway(Command command)
+		{
+			this.next.Oneway(command);
+		}
+
+		/// <summary>
+		/// Method AsyncRequest
+		/// </summary>
+		/// <returns>A FutureResponse</returns>
+		/// <param name="command">A  Command</param>
+		public virtual FutureResponse AsyncRequest(Command command)
+		{
+			return this.next.AsyncRequest(command);
+		}
+
+		/// <summary>
+		/// Property RequestTimeout
+		/// </summary>
+		public TimeSpan RequestTimeout
+		{
+			get { return this.next.RequestTimeout; }
+			set { this.next.RequestTimeout = value; }
+		}
+
+		/// <summary>
+		/// Method Request
+		/// </summary>
+		/// <returns>A Response</returns>
+		/// <param name="command">A  Command</param>
+		public virtual Response Request(Command command)
+		{
+			return Request(command, RequestTimeout);
+		}
+
+		/// <summary>
+		/// Method Request with time out for Response.
+		/// </summary>
+		/// <returns>A Response</returns>
+		/// <param name="command">A  Command</param>
+		/// <param name="timeout">Timeout in milliseconds</param>
+		public virtual Response Request(Command command, TimeSpan timeout)
+		{
+			return this.next.Request(command, timeout);
+		}
+
+		/// <summary>
+		/// Method Start
+		/// </summary>
+		public virtual void Start()
+		{
+			if(commandHandler == null)
+			{
+				throw new InvalidOperationException("command cannot be null when Start is called.");
+			}
+
+			if(exceptionHandler == null)
+			{
+				throw new InvalidOperationException("exception cannot be null when Start is called.");
+			}
+
+			this.next.Start();
+		}
+
+		/// <summary>
+		/// Property IsStarted
+		/// </summary>
+		public bool IsStarted
+		{
+			get { return this.next.IsStarted; }
+		}
+
+		/// <summary>
+		/// Method Dispose
+		/// </summary>
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected virtual void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				this.next.Dispose();
+			}
+			disposed = true;
+		}
+
+		public bool IsDisposed
+		{
+			get
+			{
+				return disposed;
+			}
+		}
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { this.commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { this.exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+		
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+		
+		public virtual void Stop()
+		{
+		}
+
+        public Object Narrow(Type type)
+        {
+            if( this.GetType().Equals( type ) ) {
+                return this;
+            } else if( this.next != null ) {
+                return this.next.Narrow( type );
+            }
+        
+            return null;
+        }
+        
+        public bool IsFaultTolerant
+        {
+            get{ return next.IsFaultTolerant; }
+        }
+
+        public bool IsConnected
+        {
+            get{ return next.IsConnected; }
+        }
+
+        public Uri RemoteAddress
+        {
+            get{ return next.RemoteAddress; }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Thu Dec  3 23:11:03 2009
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Util
+{
+    public class MessageDispatchChannel
+    {
+        private readonly Mutex mutex = new Mutex();
+        private bool closed;
+        private bool running;
+        private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+
+        #region Properties
+
+        public object SyncRoot
+        {
+            get{ return this.mutex; }
+        }
+
+        public bool Closed
+        {
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.closed;
+                }
+            }
+
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.closed = value;
+                }
+            }
+        }
+
+        public bool Running
+        {
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.running;
+                }
+            }
+
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.running = value;
+                }
+            }
+        }
+
+        public bool Empty
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count == 0;
+                }
+            }
+        }
+
+        public long Count
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count;
+                }
+            }
+        }
+
+        #endregion
+
+        public void Start()
+        {
+            lock(this.mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            lock(mutex)
+            {
+                this.running = false;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Close()
+        {
+            lock(mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = false;
+                    this.closed = true;
+                }
+
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Enqueue(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddLast(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public void EnqueueFirst(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddFirst(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            lock(this.mutex)
+            {
+                // Wait until the channel is ready to deliver messages.
+                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+
+                if( Closed || !Running || Empty )
+                {
+                    return null;
+                }
+
+                return DequeueNoWait();
+            }
+        }
+
+        public MessageDispatch DequeueNoWait()
+        {
+            MessageDispatch result = null;
+
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty )
+                {
+                    return null;
+                }
+
+                result = channel.First.Value;
+                this.channel.RemoveFirst();
+            }
+
+            return result;
+        }
+
+        public MessageDispatch Peek()
+        {
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty )
+                {
+                    return null;
+                }
+
+                return channel.First.Value;
+            }
+        }
+
+        public void Clear()
+        {
+            lock(mutex)
+            {
+                this.channel.Clear();
+            }
+        }
+
+        public MessageDispatch[] RemoveAll()
+        {
+            MessageDispatch[] result;
+
+            lock(mutex)
+            {
+                result = new MessageDispatch[this.Count];
+                channel.CopyTo(result, 0);
+                channel.Clear();
+            }
+
+            return result;
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native