You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/04 00:11:06 UTC
svn commit: r886978 [2/2] - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./
Commands/ Protocol/ Transport/ Transport/Tcp/ Util/
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Tcp
+{
+ /// <summary>
+ /// An implementation of ITransport that uses sockets to communicate with the broker
+ /// </summary>
+ public class TcpTransport : ITransport
+ {
+ private readonly object myLock = new object();
+ private readonly Socket socket;
+ private IWireFormat wireformat;
+ private BinaryReader socketReader;
+ private BinaryWriter socketWriter;
+ private Thread readThread;
+ private bool started;
+ private bool disposed = false;
+ private Atomic<bool> closed = new Atomic<bool>(false);
+ private volatile bool seenShutdown;
+ private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
+ private Uri connectedUri;
+
+ private CommandHandler commandHandler;
+ private ExceptionHandler exceptionHandler;
+ private InterruptedHandler interruptedHandler;
+ private ResumedHandler resumedHandler;
+ private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+ public TcpTransport(Uri uri, Socket socket, IWireFormat wireformat)
+ {
+ this.connectedUri = uri;
+ this.socket = socket;
+ this.wireformat = wireformat;
+ }
+
+ ~TcpTransport()
+ {
+ Dispose(false);
+ }
+
+ /// <summary>
+ /// Method Start
+ /// </summary>
+ public void Start()
+ {
+ lock(myLock)
+ {
+ if(!started)
+ {
+ if(null == commandHandler)
+ {
+ throw new InvalidOperationException(
+ "command cannot be null when Start is called.");
+ }
+
+ if(null == exceptionHandler)
+ {
+ throw new InvalidOperationException(
+ "exception cannot be null when Start is called.");
+ }
+
+ started = true;
+
+ // As reported in AMQ-988 it appears that NetworkStream is not thread safe
+ // so lets use an instance for each of the 2 streams
+ socketWriter = new EndianBinaryWriter(new NetworkStream(socket));
+ socketReader = new EndianBinaryReader(new NetworkStream(socket));
+
+ // now lets create the background read thread
+ readThread = new Thread(new ThreadStart(ReadLoop));
+ readThread.IsBackground = true;
+ readThread.Start();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Property IsStarted
+ /// </summary>
+ public bool IsStarted
+ {
+ get
+ {
+ lock(myLock)
+ {
+ return started;
+ }
+ }
+ }
+
+ public void Oneway(Command command)
+ {
+ lock(myLock)
+ {
+ if(closed.Value)
+ {
+ throw new InvalidOperationException("Error writing to broker. Transport connection is closed.");
+ }
+
+ if(command is ShutdownInfo)
+ {
+ seenShutdown = true;
+ }
+
+ Wireformat.Marshal(command, socketWriter);
+ }
+ }
+
+ public FutureResponse AsyncRequest(Command command)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
+ }
+
+ /// <summary>
+ /// Property RequestTimeout
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return this.maxWait; }
+ set { this.maxWait = value; }
+ }
+
+ public bool TcpNoDelayEnabled
+ {
+#if !NETCF
+ get { return this.socket.NoDelay; }
+ set { this.socket.NoDelay = value; }
+#else
+ get { return false; }
+ set { }
+#endif
+ }
+
+ public Response Request(Command command)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+ }
+
+ public Response Request(Command command, TimeSpan timeout)
+ {
+ throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
+ }
+
+ public void Stop()
+ {
+ Close();
+ }
+
+ public void Close()
+ {
+ if(closed.CompareAndSet(false, true))
+ {
+ lock(myLock)
+ {
+ try
+ {
+ socket.Shutdown(SocketShutdown.Both);
+ }
+ catch
+ {
+ }
+
+ try
+ {
+ if(null != socketWriter)
+ {
+ socketWriter.Close();
+ }
+ }
+ catch
+ {
+ }
+ finally
+ {
+ socketWriter = null;
+ }
+
+ try
+ {
+ if(null != socketReader)
+ {
+ socketReader.Close();
+ }
+ }
+ catch
+ {
+ }
+ finally
+ {
+ socketReader = null;
+ }
+
+ try
+ {
+ socket.Close();
+ }
+ catch
+ {
+ }
+
+ if(null != readThread)
+ {
+ if(Thread.CurrentThread != readThread
+#if !NETCF
+ && readThread.IsAlive
+#endif
+)
+ {
+ TimeSpan waitTime;
+
+ if(maxWait < MAX_THREAD_WAIT)
+ {
+ waitTime = maxWait;
+ }
+ else
+ {
+ waitTime = MAX_THREAD_WAIT;
+ }
+
+ if(!readThread.Join((int) waitTime.TotalMilliseconds))
+ {
+ readThread.Abort();
+ }
+ }
+
+ readThread = null;
+ }
+
+ started = false;
+ }
+ }
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ Close();
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
+ }
+
+ public void ReadLoop()
+ {
+ // This is the thread function for the reader thread. This runs continuously
+ // performing a blokcing read on the socket and dispatching all commands
+ // received.
+ //
+ // Exception Handling
+ // ------------------
+ // If an Exception occurs during the reading/marshalling, then the connection
+ // is effectively broken because position cannot be re-established to the next
+ // message. This is reported to the app via the exceptionHandler and the socket
+ // is closed to prevent further communication attempts.
+ //
+ // An exception in the command handler may not be fatal to the transport, so
+ // these are simply reported to the exceptionHandler.
+ //
+ while(!closed.Value)
+ {
+ Command command = null;
+
+ try
+ {
+ command = (Command) Wireformat.Unmarshal(socketReader);
+ }
+ catch(Exception ex)
+ {
+ command = null;
+ if(!closed.Value)
+ {
+ // Close the socket as there's little that can be done with this transport now.
+ Close();
+ if(!seenShutdown)
+ {
+ this.exceptionHandler(this, ex);
+ }
+ }
+
+ break;
+ }
+
+ try
+ {
+ if(command != null)
+ {
+ this.commandHandler(this, command);
+ }
+ }
+ catch(Exception e)
+ {
+ this.exceptionHandler(this, e);
+ }
+ }
+ }
+
+ // Implementation methods
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set { this.commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { this.exceptionHandler = value; }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set { this.interruptedHandler = value; }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set { this.resumedHandler = value; }
+ }
+
+ public IWireFormat Wireformat
+ {
+ get { return wireformat; }
+ set { wireformat = value; }
+ }
+
+ public bool IsFaultTolerant
+ {
+ get { return false; }
+ }
+
+ public bool IsConnected
+ {
+ get { return socket.Connected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get { return connectedUri; }
+ }
+
+ public Object Narrow(Type type)
+ {
+ if(this.GetType().Equals(type))
+ {
+ return this;
+ }
+
+ return null;
+ }
+
+ }
+}
+
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using System.Net;
+using System.Net.Sockets;
+using Apache.NMS.Stomp.Transport.Stomp;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Transport.Tcp
+{
+ public class TcpTransportFactory : ITransportFactory
+ {
+ public TcpTransportFactory()
+ {
+ }
+
+ #region Properties
+
+ private bool useLogging = false;
+ public bool UseLogging
+ {
+ get { return useLogging; }
+ set { useLogging = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of the receive buffer.
+ /// </summary>
+ private int receiveBufferSize = 8192;
+ public int ReceiveBufferSize
+ {
+ get { return receiveBufferSize; }
+ set { receiveBufferSize = value; }
+ }
+
+ /// <summary>
+ /// Size in bytes of send buffer.
+ /// </summary>
+ private int sendBufferSize = 8192;
+ public int SendBufferSize
+ {
+ get { return sendBufferSize; }
+ set { sendBufferSize = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. The default value is 0, which indicates
+ /// an infinite time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int receiveTimeout = 0;
+ public int ReceiveTimeout
+ {
+ get { return receiveTimeout; }
+ set { receiveTimeout = value; }
+ }
+
+ /// <summary>
+ /// The time-out value, in milliseconds. If you set the property with a value between 1 and 499,
+ /// the value will be changed to 500. The default value is 0, which indicates an infinite
+ /// time-out period. Specifying -1 also indicates an infinite time-out period.
+ /// </summary>
+ private int sendTimeout = 0;
+ public int SendTimeout
+ {
+ get { return sendTimeout; }
+ set { sendTimeout = value; }
+ }
+
+ private string wireFormat = "Stomp";
+ public string WireFormat
+ {
+ get { return wireFormat; }
+ set { wireFormat = value; }
+ }
+
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+ public int RequestTimeout
+ {
+ get { return (int) requestTimeout.TotalMilliseconds; }
+ set { requestTimeout = TimeSpan.FromMilliseconds(value); }
+ }
+
+ #endregion
+
+ #region ITransportFactory Members
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ return CompositeConnect(location, null);
+ }
+
+ public ITransport CompositeConnect(Uri location, SetTransport setTransport)
+ {
+ // Extract query parameters from broker Uri
+ StringDictionary map = URISupport.ParseQuery(location.Query);
+
+ // Set transport. properties on this (the factory)
+ URISupport.SetProperties(this, map, "transport.");
+
+ Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+ Socket socket = Connect(location.Host, location.Port);
+
+#if !NETCF
+ socket.ReceiveBufferSize = ReceiveBufferSize;
+ socket.SendBufferSize = SendBufferSize;
+ socket.ReceiveTimeout = ReceiveTimeout;
+ socket.SendTimeout = SendTimeout;
+#endif
+
+ IWireFormat wireformat = CreateWireFormat(map);
+ ITransport transport = new TcpTransport(location, socket, wireformat);
+
+ wireformat.Transport = transport;
+
+ if(UseLogging)
+ {
+ transport = new LoggingTransport(transport);
+ }
+
+ if(wireformat is OpenWireFormat)
+ {
+ transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+ }
+
+ transport.RequestTimeout = this.requestTimeout;
+
+ if(setTransport != null)
+ {
+ setTransport(transport, location);
+ }
+
+ return transport;
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ ITransport transport = CompositeConnect(location);
+
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ transport.RequestTimeout = this.requestTimeout;
+
+ return transport;
+ }
+
+ #endregion
+
+ // DISCUSSION: Caching host entries may not be the best strategy when using the
+ // failover protocol. The failover protocol needs to be very dynamic when looking
+ // up hostnames at runtime. If old hostname->IP mappings are kept around, this may
+ // lead to runtime failures that could have been avoided by dynamically looking up
+ // the new hostname IP.
+#if CACHE_HOSTENTRIES
+ private static IDictionary<string, IPHostEntry> CachedIPHostEntries = new Dictionary<string, IPHostEntry>();
+ private static readonly object _syncLock = new object();
+#endif
+ public static IPHostEntry GetIPHostEntry(string host)
+ {
+ IPHostEntry ipEntry;
+
+#if CACHE_HOSTENTRIES
+ string hostUpperName = host.ToUpper();
+
+ lock (_syncLock)
+ {
+ if (!CachedIPHostEntries.TryGetValue(hostUpperName, out ipEntry))
+ {
+ try
+ {
+ ipEntry = Dns.GetHostEntry(hostUpperName);
+ CachedIPHostEntries.Add(hostUpperName, ipEntry);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
+ }
+ }
+#else
+ try
+ {
+ ipEntry = Dns.GetHostEntry(host);
+ }
+ catch
+ {
+ ipEntry = null;
+ }
+#endif
+
+ return ipEntry;
+ }
+
+ private Socket ConnectSocket(IPAddress address, int port)
+ {
+ if(null != address)
+ {
+ try
+ {
+ Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+
+ if(null != socket)
+ {
+ socket.Connect(new IPEndPoint(address, port));
+ if(socket.Connected)
+ {
+ return socket;
+ }
+ }
+ }
+ catch
+ {
+ }
+ }
+
+ return null;
+ }
+
+ public static bool TryParseIPAddress(string host, out IPAddress ipaddress)
+ {
+#if !NETCF
+ return IPAddress.TryParse(host, out ipaddress);
+#else
+ try
+ {
+ ipaddress = IPAddress.Parse(host);
+ }
+ catch
+ {
+ ipaddress = null;
+ }
+
+ return (null != ipaddress);
+#endif
+ }
+
+ public static IPAddress GetIPAddress(string hostname, AddressFamily addressFamily)
+ {
+ IPAddress ipaddress = null;
+ IPHostEntry hostEntry = GetIPHostEntry(hostname);
+
+ if(null != hostEntry)
+ {
+ ipaddress = GetIPAddress(hostEntry, addressFamily);
+ }
+
+ return ipaddress;
+ }
+
+ public static IPAddress GetIPAddress(IPHostEntry hostEntry, AddressFamily addressFamily)
+ {
+ if(null != hostEntry)
+ {
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(address.AddressFamily == addressFamily)
+ {
+ return address;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ protected Socket Connect(string host, int port)
+ {
+ Socket socket = null;
+ IPAddress ipaddress;
+
+ try
+ {
+ if(TryParseIPAddress(host, out ipaddress))
+ {
+ socket = ConnectSocket(ipaddress, port);
+ }
+ else
+ {
+ // Looping through the AddressList allows different type of connections to be tried
+ // (IPv6, IPv4 and whatever else may be available).
+ IPHostEntry hostEntry = GetIPHostEntry(host);
+
+ if(null != hostEntry)
+ {
+ // Prefer IPv6 first.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetworkV6);
+ socket = ConnectSocket(ipaddress, port);
+ if(null == socket)
+ {
+ // Try IPv4 next.
+ ipaddress = GetIPAddress(hostEntry, AddressFamily.InterNetwork);
+ socket = ConnectSocket(ipaddress, port);
+ if(null == socket)
+ {
+ // Try whatever else there is.
+ foreach(IPAddress address in hostEntry.AddressList)
+ {
+ if(AddressFamily.InterNetworkV6 == address.AddressFamily
+ || AddressFamily.InterNetwork == address.AddressFamily)
+ {
+ // Already tried these protocols.
+ continue;
+ }
+
+ socket = ConnectSocket(address, port);
+ if(null != socket)
+ {
+ ipaddress = address;
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if(null == socket)
+ {
+ throw new SocketException();
+ }
+ }
+ catch(Exception ex)
+ {
+ throw new NMSConnectionException(String.Format("Error connecting to {0}:{1}.", host, port), ex);
+ }
+
+ Tracer.DebugFormat("Connected to {0}:{1} using {2} protocol.", host, port, ipaddress.AddressFamily.ToString());
+ return socket;
+ }
+
+ protected IWireFormat CreateWireFormat(StringDictionary map)
+ {
+ object properties = null;
+ IWireFormat wireFormatItf = null;
+
+ wireFormatItf = new StompWireFormat();
+ properties = wireFormatItf;
+
+ if(null != properties)
+ {
+ // Set wireformat. properties on the wireformat owned by the tcpTransport
+ URISupport.SetProperties(properties, map, "wireFormat.");
+ }
+
+ return wireFormatItf;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Transport.Discovery;
+using Apache.NMS.Stomp.Transport.Failover;
+using Apache.NMS.Stomp.Transport.Mock;
+using Apache.NMS.Stomp.Transport.Tcp;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ public class TransportFactory
+ {
+ public static event ExceptionListener OnException;
+
+ public static void HandleException(Exception ex)
+ {
+ if(TransportFactory.OnException != null)
+ {
+ TransportFactory.OnException(ex);
+ }
+ }
+
+ /// <summary>
+ /// Creates a normal transport.
+ /// </summary>
+ /// <param name="location"></param>
+ /// <returns>the transport</returns>
+ public static ITransport CreateTransport(Uri location)
+ {
+ ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+ return tf.CreateTransport(location);
+ }
+
+ public static ITransport CompositeConnect(Uri location)
+ {
+ ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+ return tf.CompositeConnect(location);
+ }
+
+ public static ITransport AsyncCompositeConnect(Uri location, SetTransport setTransport)
+ {
+ ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
+ return tf.CompositeConnect(location, setTransport);
+ }
+
+ /// <summary>
+ /// Create a transport factory for the scheme. If we do not support the transport protocol,
+ /// an NMSConnectionException will be thrown.
+ /// </summary>
+ /// <param name="location"></param>
+ /// <returns></returns>
+ private static ITransportFactory CreateTransportFactory(Uri location)
+ {
+ string scheme = location.Scheme;
+
+ if(null == scheme || 0 == scheme.Length)
+ {
+ throw new NMSConnectionException(String.Format("Transport scheme invalid: [{0}]", location.ToString()));
+ }
+
+ ITransportFactory factory = null;
+
+ try
+ {
+ switch(scheme.ToLower())
+ {
+ case "tcp":
+ factory = new TcpTransportFactory();
+ break;
+ default:
+ throw new NMSConnectionException(String.Format("The transport {0} is not supported.", scheme));
+ }
+ }
+ catch(NMSConnectionException)
+ {
+ throw;
+ }
+ catch
+ {
+ throw new NMSConnectionException("Error creating transport.");
+ }
+
+ if(null == factory)
+ {
+ throw new NMSConnectionException("Unable to create a transport.");
+ }
+
+ return factory;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.Stomp.Transport
+{
+ /// <summary>
+ /// Used to implement a filter on the transport layer.
+ /// </summary>
+ public class TransportFilter : ITransport
+ {
+ protected readonly ITransport next;
+ protected CommandHandler commandHandler;
+ protected ExceptionHandler exceptionHandler;
+ protected InterruptedHandler interruptedHandler;
+ protected ResumedHandler resumedHandler;
+ private bool disposed = false;
+
+ public TransportFilter(ITransport next)
+ {
+ this.next = next;
+ this.next.Command = new CommandHandler(OnCommand);
+ this.next.Exception = new ExceptionHandler(OnException);
+ this.next.Interrupted = new InterruptedHandler(OnInterrupted);
+ this.next.Resumed = new ResumedHandler(OnResumed);
+ }
+
+ ~TransportFilter()
+ {
+ Dispose(false);
+ }
+
+ protected virtual void OnCommand(ITransport sender, Command command)
+ {
+ this.commandHandler(sender, command);
+ }
+
+ protected virtual void OnException(ITransport sender, Exception command)
+ {
+ this.exceptionHandler(sender, command);
+ }
+
+ protected virtual void OnInterrupted(ITransport sender)
+ {
+ if(this.interruptedHandler != null)
+ {
+ this.interruptedHandler(sender);
+ }
+ }
+
+ protected virtual void OnResumed(ITransport sender)
+ {
+ if(this.resumedHandler != null)
+ {
+ this.resumedHandler(sender);
+ }
+ }
+
+ /// <summary>
+ /// Method Oneway
+ /// </summary>
+ /// <param name="command">A Command</param>
+ public virtual void Oneway(Command command)
+ {
+ this.next.Oneway(command);
+ }
+
+ /// <summary>
+ /// Method AsyncRequest
+ /// </summary>
+ /// <returns>A FutureResponse</returns>
+ /// <param name="command">A Command</param>
+ public virtual FutureResponse AsyncRequest(Command command)
+ {
+ return this.next.AsyncRequest(command);
+ }
+
+ /// <summary>
+ /// Property RequestTimeout
+ /// </summary>
+ public TimeSpan RequestTimeout
+ {
+ get { return this.next.RequestTimeout; }
+ set { this.next.RequestTimeout = value; }
+ }
+
+ /// <summary>
+ /// Method Request
+ /// </summary>
+ /// <returns>A Response</returns>
+ /// <param name="command">A Command</param>
+ public virtual Response Request(Command command)
+ {
+ return Request(command, RequestTimeout);
+ }
+
+ /// <summary>
+ /// Method Request with time out for Response.
+ /// </summary>
+ /// <returns>A Response</returns>
+ /// <param name="command">A Command</param>
+ /// <param name="timeout">Timeout in milliseconds</param>
+ public virtual Response Request(Command command, TimeSpan timeout)
+ {
+ return this.next.Request(command, timeout);
+ }
+
+ /// <summary>
+ /// Method Start
+ /// </summary>
+ public virtual void Start()
+ {
+ if(commandHandler == null)
+ {
+ throw new InvalidOperationException("command cannot be null when Start is called.");
+ }
+
+ if(exceptionHandler == null)
+ {
+ throw new InvalidOperationException("exception cannot be null when Start is called.");
+ }
+
+ this.next.Start();
+ }
+
+ /// <summary>
+ /// Property IsStarted
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return this.next.IsStarted; }
+ }
+
+ /// <summary>
+ /// Method Dispose
+ /// </summary>
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if(disposing)
+ {
+ this.next.Dispose();
+ }
+ disposed = true;
+ }
+
+ public bool IsDisposed
+ {
+ get
+ {
+ return disposed;
+ }
+ }
+
+ public CommandHandler Command
+ {
+ get { return commandHandler; }
+ set { this.commandHandler = value; }
+ }
+
+ public ExceptionHandler Exception
+ {
+ get { return exceptionHandler; }
+ set { this.exceptionHandler = value; }
+ }
+
+ public InterruptedHandler Interrupted
+ {
+ get { return interruptedHandler; }
+ set { this.interruptedHandler = value; }
+ }
+
+ public ResumedHandler Resumed
+ {
+ get { return resumedHandler; }
+ set { this.resumedHandler = value; }
+ }
+
+ public virtual void Stop()
+ {
+ }
+
+ public Object Narrow(Type type)
+ {
+ if( this.GetType().Equals( type ) ) {
+ return this;
+ } else if( this.next != null ) {
+ return this.next.Narrow( type );
+ }
+
+ return null;
+ }
+
+ public bool IsFaultTolerant
+ {
+ get{ return next.IsFaultTolerant; }
+ }
+
+ public bool IsConnected
+ {
+ get{ return next.IsConnected; }
+ }
+
+ public Uri RemoteAddress
+ {
+ get{ return next.RemoteAddress; }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/TransportFilter.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=886978&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Thu Dec 3 23:11:03 2009
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Util
+{
+ public class MessageDispatchChannel
+ {
+ private readonly Mutex mutex = new Mutex();
+ private bool closed;
+ private bool running;
+ private LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+
+ #region Properties
+
+ public object SyncRoot
+ {
+ get{ return this.mutex; }
+ }
+
+ public bool Closed
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.closed;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.closed = value;
+ }
+ }
+ }
+
+ public bool Running
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.running;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.running = value;
+ }
+ }
+ }
+
+ public bool Empty
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count == 0;
+ }
+ }
+ }
+
+ public long Count
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count;
+ }
+ }
+ }
+
+ #endregion
+
+ public void Start()
+ {
+ lock(this.mutex)
+ {
+ if(!Closed)
+ {
+ this.running = true;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ lock(mutex)
+ {
+ this.running = false;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Close()
+ {
+ lock(mutex)
+ {
+ if(!Closed)
+ {
+ this.running = false;
+ this.closed = true;
+ }
+
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Enqueue(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddLast(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public void EnqueueFirst(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddFirst(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public MessageDispatch Dequeue(TimeSpan timeout)
+ {
+ lock(this.mutex)
+ {
+ // Wait until the channel is ready to deliver messages.
+ if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
+ {
+ Monitor.Wait(this.mutex, timeout);
+ }
+
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return DequeueNoWait();
+ }
+ }
+
+ public MessageDispatch DequeueNoWait()
+ {
+ MessageDispatch result = null;
+
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ result = channel.First.Value;
+ this.channel.RemoveFirst();
+ }
+
+ return result;
+ }
+
+ public MessageDispatch Peek()
+ {
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return channel.First.Value;
+ }
+ }
+
+ public void Clear()
+ {
+ lock(mutex)
+ {
+ this.channel.Clear();
+ }
+ }
+
+ public MessageDispatch[] RemoveAll()
+ {
+ MessageDispatch[] result;
+
+ lock(mutex)
+ {
+ result = new MessageDispatch[this.Count];
+ channel.CopyTo(result, 0);
+ channel.Clear();
+ }
+
+ return result;
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native