You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/15 00:18:57 UTC

svn commit: r1542115 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp: ./ Commands/ Messages/ Threads/ Transport/ Transport/Tcp/ Util/ commands/

Author: tabish
Date: Thu Nov 14 23:18:56 2013
New Revision: 1542115

URL: http://svn.apache.org/r1542115
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Starting to implement the framework 

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/
      - copied from r1541299, activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/commands/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs   (with props)
Removed:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/commands/
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1542115&r1=1541299&r2=1542115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Thu Nov 14 23:18:56 2013
@@ -29,6 +29,7 @@ namespace Apache.NMS.MQTT.Commands
 	public class CONNECT
 	{
 		public const byte TYPE = 1;
+		public const String PROTOCOL_NAME = "MQIsdp";
 
 		private byte version = 3;
 		public byte Version

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// ------------------------------------------------------------------------------
+//  <autogenerated>
+//      This code was generated by a tool.
+//      Mono Runtime Version: 4.0.30319.1
+// 
+//      Changes to this file may cause incorrect behavior and will be lost if 
+//      the code is regenerated.
+//  </autogenerated>
+// ------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyDescriptionAttribute("Apache NMS for MQTT Class Library (.Net Messaging Library Implementation): An implementation of the NMS API for MQTT")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
+[assembly: AssemblyProductAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("1.7.0.3237")]
+[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,338 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT
+{
+	public class Connection : IConnection
+	{
+		private readonly CONNECT info = null;
+		private ITransport transport;
+		private readonly Uri brokerUri;
+        private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+        private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+        private readonly object myLock = new object();
+        private readonly Atomic<bool> connected = new Atomic<bool>(false);
+        private readonly Atomic<bool> closed = new Atomic<bool>(false);
+        private readonly Atomic<bool> closing = new Atomic<bool>(false);
+        private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+        private Exception firstFailureError = null;
+        private int sessionCounter = 0;
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+        private ConnectionMetaData metaData = null;
+        private bool disposed = false;
+        private readonly MessageTransformation messageTransformation;
+        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+
+		public Connection(Uri connectionUri, ITransport transport)
+		{
+			this.brokerUri = connectionUri;
+			this.clientIdGenerator = clientIdGenerator;
+
+			SetTransport(transport);
+
+			this.info = new CONNECT();
+		}
+
+		~Connection()
+		{
+			Dispose(false);
+		}
+
+		#region Properties
+
+		/// <summary>
+		/// A delegate that can receive transport level exceptions.
+		/// </summary>
+		public event ExceptionListener ExceptionListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been interrupted.
+		/// </summary>
+		public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been resumed.
+		/// </summary>
+		public event ConnectionResumedListener ConnectionResumedListener;
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
+		public String UserName
+		{
+			get { return this.info.UserName; }
+			set { this.info.UserName = value; }
+		}
+
+		public String Password
+		{
+			get { return this.info.Password; }
+			set { this.info.Password = value; }
+		}
+
+		/// <summary>
+		/// This property sets the acknowledgment mode for the connection.
+		/// The URI parameter connection.ackmode can be set to a string value
+		/// that maps to the enumeration value.
+		/// </summary>
+		public string AckMode
+		{
+			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+		}
+
+		public IConnectionMetaData MetaData
+		{
+			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+		}
+
+		public Uri BrokerUri
+		{
+			get { return brokerUri; }
+		}
+
+		public ITransport ITransport
+		{
+			get { return transport; }
+			set { this.transport = value; }
+		}
+
+		public bool TransportFailed
+		{
+			get { return this.transportFailed.Value; }
+		}
+
+		public Exception FirstFailureError
+		{
+			get { return this.firstFailureError; }
+		}
+
+		public TimeSpan RequestTimeout
+		{
+			get { return this.requestTimeout; }
+			set { this.requestTimeout = value; }
+		}
+
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return acknowledgementMode; }
+			set { this.acknowledgementMode = value; }
+		}
+
+		internal MessageTransformation MessageTransformation
+		{
+			get { return this.messageTransformation; }
+		}
+
+		#endregion
+
+		private void SetTransport(ITransport newTransport)
+		{
+			this.transport = newTransport;
+			this.transport.Command = new CommandHandler(OnCommand);
+			this.transport.Exception = new ExceptionHandler(OnTransportException);
+			this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+			this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+		}
+
+		/// <summary>
+		/// Starts asynchronous message delivery of incoming messages for this connection.
+		/// Synchronous delivery is unaffected.
+		/// </summary>
+		public void Start()
+		{
+			CheckConnected();
+			if(started.CompareAndSet(false, true))
+			{
+				lock(sessions.SyncRoot)
+				{
+					foreach(Session session in sessions)
+					{
+						session.Start();
+					}
+				}
+			}
+		}
+
+		/// <summary>
+		/// This property determines if the asynchronous message delivery of incoming
+		/// messages has been started for this connection.
+		/// </summary>
+		public bool IsStarted
+		{
+			get { return started.Value; }
+		}
+
+		/// <summary>
+		/// Temporarily stop asynchronous delivery of inbound messages for this connection.
+		/// The sending of outbound messages is unaffected.
+		/// </summary>
+		public void Stop()
+		{
+			if(started.CompareAndSet(true, false))
+			{
+				lock(sessions.SyncRoot)
+				{
+					foreach(Session session in sessions)
+					{
+						session.Stop();
+					}
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession()
+		{
+			return CreateMQTTSession(acknowledgementMode);
+		}
+
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+		{
+			return CreateMQTTSession(sessionAcknowledgementMode);
+		}
+
+		protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
+		{
+			CheckConnected();
+			return new Session(this, NextSessionId, ackMode);
+		}
+
+		internal void AddSession(Session session)
+		{
+			if(!this.closing.Value)
+			{
+				sessions.Add(session);
+			}
+		}
+
+		internal void RemoveSession(Session session)
+		{
+			if(!this.closing.Value)
+			{
+				sessions.Remove(session);
+				RemoveDispatcher(session);
+			}
+		}
+
+		internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Add(id, dispatcher);
+			}
+		}
+
+		internal void RemoveDispatcher(ConsumerId id)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Remove(id);
+			}
+		}
+
+		internal void AddProducer(ProducerId id, MessageProducer producer)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Add(id, producer);
+			}
+		}
+
+		internal void RemoveProducer(ProducerId id)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Remove(id);
+			}
+		}
+
+	    internal void RemoveDispatcher(IDispatcher dispatcher) 
+		{
+	        this.connectionAudit.RemoveDispatcher(dispatcher);
+	    }
+
+		public void Close()
+		{
+			// TODO
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			disposed = true;
+		}
+
+		protected void CheckClosedOrFailed()
+		{
+			CheckClosed();
+			if(transportFailed.Value)
+			{
+				throw new ConnectionFailedException(firstFailureError.Message);
+			}
+		}
+
+		protected void CheckClosed()
+		{
+			if(closed.Value)
+			{
+				throw new ConnectionClosedException();
+			}
+		}
+
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT
+{
+	/// <summary>
+	/// Exception thrown when a connection is used that it already closed
+	/// </summary>
+	[Serializable]
+	public class ConnectionClosedException : NMSException
+	{
+		public ConnectionClosedException() : base("The connection is already closed!")
+		{
+		}
+
+		public ConnectionClosedException(string message)
+			: base(message)
+		{
+		}
+
+		public ConnectionClosedException(string message, string errorCode)
+			: base(message, errorCode)
+		{
+		}
+
+		public ConnectionClosedException(string message, Exception innerException)
+			: base(message, innerException)
+		{
+		}
+
+		public ConnectionClosedException(string message, string errorCode, Exception innerException)
+			: base(message, errorCode, innerException)
+		{
+		}
+
+		#region ISerializable interface implementation
+
+		/// <summary>
+		/// Initializes a new instance of the ConnectionClosedException class with serialized data.
+		/// Throws System.ArgumentNullException if the info parameter is null.
+		/// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+		/// </summary>
+		/// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+		/// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+		protected ConnectionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+			: base(info, context)
+		{
+		}
+
+		#endregion
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs?rev=1542115&r1=1542114&r2=1542115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs Thu Nov 14 23:18:56 2013
@@ -47,8 +47,7 @@ namespace Apache.NMS.MQTT
             this.providerMinorVersion = asmName.Version.Minor;
             this.providerVersion = asmName.Version.ToString();
 
-            this.nmsxProperties =
-                new String[]{ "NMSXGroupID", "NMSXGroupSeq", "NMSXDeliveryCount", "NMSXProducerTXID" };
+			this.nmsxProperties = null;
 
             foreach(AssemblyName name in self.GetReferencedAssemblies())
             {

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT
+{
+    /// <summary>
+    /// Interface that provides for a Class to provide dispatching service for
+    /// an OpenWire MessageDispatch command.
+    /// </summary>
+    public interface IDispatcher
+    {
+        void Dispatch(MessageDispatch messageDispatch);
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+
+	/// <summary>
+	/// Exception thrown when an IO error occurs
+	/// </summary>
+	public class IOException : NMSException
+	{
+		public IOException()
+			: base("IO Exception failed with missing exception log")
+		{
+		}
+
+		public IOException(String msg)
+			: base(msg)
+		{
+		}
+
+		public IOException(String msg, Exception inner)
+			: base(msg, inner)
+		{
+		}
+	}
+}
+
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,68 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT
+{
+	public class MessageConsumer : IMessageConsumer, IDispatcher
+	{
+        private readonly MessageTransformation messageTransformation;
+        private readonly MessageDispatchChannel unconsumedMessages;
+        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+
+		private readonly Atomic<bool> started = new Atomic<bool>();
+
+        private Exception failureError;
+		private ThreadPoolExecutor executor;
+
+		private event MessageListener listener;
+
+		public MessageConsumer()
+		{
+		}
+
+		#region Property Accessors
+
+        public Exception FailureError
+        {
+            get { return this.failureError; }
+            set { this.failureError = value; }
+        }
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		#endregion
+
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,190 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+
+namespace Apache.NMS.MQTT
+{
+	public class MessageProducer : IMessageProducer
+	{
+		private readonly Session session;
+		private readonly object closedLock = new object();
+		private bool closed = false;
+
+		private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
+		private TimeSpan requestTimeout;
+		protected bool disposed = false;
+
+		private readonly MessageTransformation messageTransformation;
+
+		public MessageProducer(Session session, TimeSpan requestTimeout)
+		{
+			this.session = session;
+			this.RequestTimeout = requestTimeout;
+			this.messageTransformation = session.Connection.MessageTransformation;
+		}
+
+		~MessageProducer()
+		{
+			Dispose(false);
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+			}
+
+			disposed = true;
+		}
+
+		public void Close()
+		{
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					return;
+				}
+
+				Shutdown();
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.DebugFormat("Remove of Producer[{0}] sent.", this.ProducerId);
+				}
+			}
+		}
+
+		/// <summary>
+		/// Called from the Parent session to deactivate this Producer, when a parent
+		/// is closed all children are automatically removed from the broker so this
+		/// method circumvents the need to send a Remove command to the broker.
+		/// </summary>
+		internal void Shutdown()
+		{
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					return;
+				}
+
+				try
+				{
+					session.RemoveProducer(info.ProducerId);
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during producer close: {0}", ex);
+				}
+
+				closed = true;
+			}
+		}
+
+		public void Send(IMessage message)
+		{
+			Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+		}
+
+		public void Send(IDestination destination, IMessage message)
+		{
+			Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+		}
+
+		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		{
+			Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+		}
+
+		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		{
+			Send(destination, message, deliveryMode, priority, timeToLive, true);
+		}
+
+		protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+		{
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
+		public IMessage CreateMessage()
+		{
+			return session.CreateMessage();
+		}
+
+		public ITextMessage CreateTextMessage()
+		{
+			return session.CreateTextMessage();
+		}
+
+		public ITextMessage CreateTextMessage(string text)
+		{
+			return session.CreateTextMessage(text);
+		}
+
+		public IMapMessage CreateMapMessage()
+		{
+			return session.CreateMapMessage();
+		}
+
+		public IObjectMessage CreateObjectMessage(object body)
+		{
+			return session.CreateObjectMessage(body);
+		}
+
+		public IBytesMessage CreateBytesMessage()
+		{
+			return session.CreateBytesMessage();
+		}
+
+		public IBytesMessage CreateBytesMessage(byte[] body)
+		{
+			return session.CreateBytesMessage(body);
+		}
+
+		public IStreamMessage CreateStreamMessage()
+		{
+			return session.CreateStreamMessage();
+		}
+
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,425 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Collections;
+
+namespace Apache.NMS.MQTT
+{
+	public class Session : ISession, IDispatcher
+	{
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
+
+        private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+
+        private readonly SessionExecutor executor;
+        private readonly Connection connection;
+
+        private int consumerCounter;
+        private int producerCounter;
+
+        protected bool disposed = false;
+        protected bool closed = false;
+        protected bool closing = false;
+
+        private readonly AcknowledgementMode acknowledgementMode;
+
+		public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+		{
+            this.connection = connection;
+            this.acknowledgementMode = acknowledgementMode;
+
+            this.ConsumerTransformer = connection.ConsumerTransformer;
+            this.ProducerTransformer = connection.ProducerTransformer;
+
+            this.executor = new SessionExecutor(this, this.consumers);
+
+            if(connection.IsStarted)
+            {
+                this.Start();
+            }
+
+            connection.AddSession(this);
+		}
+
+        ~Session()
+        {
+            Dispose(false);
+        }
+
+        #region Property Accessors
+
+        public virtual AcknowledgementMode AcknowledgementMode
+        {
+            get { return this.acknowledgementMode; }
+        }
+
+        public virtual bool IsClientAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+        }
+
+        public virtual bool IsAutoAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+        }
+
+        public virtual bool IsDupsOkAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+        }
+
+        public virtual bool IsIndividualAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
+        }
+
+        public virtual bool IsTransacted
+        {
+            get{ return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public SessionExecutor Executor
+        {
+            get { return this.executor; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        /// <summary>
+        /// A Delegate that is called each time a Message is dispatched to allow the client to do
+        /// any necessary transformations on the received message before it is delivered.
+        /// The Session instance sets the delegate on each Consumer it creates.
+        /// </summary>
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        /// <summary>
+        /// A delegate that is called each time a Message is sent from this Producer which allows
+        /// the application to perform any needed transformations on the Message before it is sent.
+        /// The Session instance sets the delegate on each Producer it creates.
+        /// </summary>
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+		#endregion
+
+        #region ISession Members
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(this.disposed)
+            {
+                return;
+            }
+
+            try
+            {
+                // Force a Stop when we are Disposing vs a Normal Close.
+                this.executor.Stop(this.disposeStopTimeout);
+
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            this.disposed = true;
+        }
+
+        public virtual void Close()
+        {
+            if (!this.closed)
+            {
+                try
+                {
+                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+                    DoClose();
+                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
+                }
+                catch (Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            MessageProducer producer = null;
+			// TODO
+            return producer;
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            if(destination == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+            }
+
+            MessageConsumer consumer = null;
+            return consumer;
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+        {
+            if(destination == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+            }
+
+            MessageConsumer consumer = null;
+            return consumer;
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            throw new NotSupportedException("Not supported with MQTT Protocol");
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            throw new NotSupportedException("Not supported with MQTT Protocol");
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            throw new NotSupportedException("Not supported with MQTT Protocol");
+        }
+
+        public ITopic GetTopic(string name)
+        {
+			return null;  // TODO
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            throw new NotSupportedException("Not supported with MQTT Protocol");
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            throw new NotSupportedException("Not supported with MQTT Protocol");
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            throw new NotSupportedException("MQTT Cannot delete Destinations");
+        }
+
+        public IMessage CreateMessage()
+        {
+            throw new NotSupportedException("No empty Message in MQTT");
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            throw new NotSupportedException("No Text Message in MQTT");
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            throw new NotSupportedException("No Text Message in MQTT");
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            throw new NotSupportedException("No Map Message in MQTT");
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return ConfigureMessage(new BytesMessage()) as IBytesMessage;
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            BytesMessage answer = new BytesMessage();
+            answer.Content = body;
+            return ConfigureMessage(answer) as IBytesMessage;
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            throw new NotSupportedException("No Object Message in MQTT");
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            throw new NotSupportedException("No Object Message in MQTT");
+        }
+
+        public void Commit()
+        {
+            throw new NotSupportedException("No Transaction support in MQTT");
+        }
+
+        public void Rollback()
+        {
+            throw new NotSupportedException("No Transaction support in MQTT");
+        }
+
+        public void Recover()
+        {
+            CheckClosed();
+
+            if (acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                throw new IllegalStateException("Cannot Recover a Transacted Session");
+            }
+
+            throw new NotSupportedException("No Recover support yet");
+        }
+
+        public void Stop()
+        {
+            if(this.executor != null)
+            {
+                this.executor.Stop();
+            }
+        }
+
+        public void Start()
+        {
+            foreach(MessageConsumer consumer in this.consumers.Values)
+            {
+                consumer.Start();
+            }
+
+            if(this.executor != null)
+            {
+                this.executor.Start();
+            }
+        }
+
+        public bool Started
+        {
+            get
+            {
+                return this.executor != null ? this.executor.Running : false;
+            }
+        }
+
+		#endregion
+
+        public void AddConsumer(MessageConsumer consumer)
+        {
+            if(!this.closing)
+            {
+                ConsumerId id = consumer.ConsumerId;
+
+                // Registered with Connection before we register at the broker.
+                consumers[id] = consumer;
+                connection.AddDispatcher(id, this);
+            }
+        }
+
+        public void RemoveConsumer(MessageConsumer consumer)
+        {
+            connection.RemoveDispatcher(consumer.ConsumerId);
+            if(!this.closing)
+            {
+                consumers.Remove(consumer.ConsumerId);
+            }
+			connection.RemoveDispatcher(consumer);
+        }
+
+        public void AddProducer(MessageProducer producer)
+        {
+            if(!this.closing)
+            {
+                ProducerId id = producer.ProducerId;
+
+                this.producers[id] = producer;
+                this.connection.AddProducer(id, producer);
+            }
+        }
+
+        public void RemoveProducer(ProducerId objectId)
+        {
+            connection.RemoveProducer(objectId);
+            if(!this.closing)
+            {
+                producers.Remove(objectId);
+            }
+        }
+
+        private MQTTMessage ConfigureMessage(MQTTMessage message)
+        {
+            message.Connection = this.connection;
+            return message;
+        }
+
+        private void CheckClosed()
+        {
+            if (closed)
+            {
+                throw new IllegalStateException("Session is Closed");
+            }
+        }
+
+        /// <summary>
+        /// Prevents message from throwing an exception if a client calls Acknoweldge on
+        /// a message that is part of a transaction either being produced or consumed.  The
+        /// JMS Spec indicates that users should be able to call Acknowledge with no effect
+        /// if the message is in a transaction.
+        /// </summary>
+        /// <param name="message">
+        /// A <see cref="ActiveMQMessage"/>
+        /// </param>
+        private static void DoNothingAcknowledge(ActiveMQMessage message)
+        {
+        }
+
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Threads;
+
+namespace Apache.NMS.MQTT
+{
+    public class SessionExecutor : Threads.Task
+    {
+        private readonly MessageDispatchChannel messageQueue = null;
+        private TaskRunner taskRunner = null;
+
+        private readonly Session session = null;
+        private readonly IDictionary consumers = null;
+
+        public SessionExecutor(Session session, IDictionary consumers)
+        {
+            this.session = session;
+            this.consumers = consumers;
+
+            if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
+            {
+               this.messageQueue = new SimplePriorityMessageDispatchChannel();
+            }
+            else
+            {
+                this.messageQueue = new FifoMessageDispatchChannel();
+            }
+        }
+
+        ~SessionExecutor()
+        {
+            try
+            {
+                Stop();
+                Close();
+                Clear();
+            }
+            catch
+            {
+            }
+        }
+
+        public void Execute(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.Enqueue(dispatch);
+            this.Wakeup();
+        }
+
+        public void ExecuteFirst(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.EnqueueFirst(dispatch);
+            this.Wakeup();
+        }
+
+        public void Wakeup()
+        {
+            TaskRunner taskRunner = this.taskRunner;
+
+            lock(messageQueue.SyncRoot)
+            {
+                if(this.taskRunner == null)
+                {
+                    this.taskRunner = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(this);
+                }
+
+                taskRunner = this.taskRunner;
+            }
+
+            taskRunner.Wakeup();
+        }
+
+        public void Start()
+        {
+            if(!messageQueue.Running)
+            {
+                messageQueue.Start();
+
+                if(HasUncomsumedMessages)
+                {
+                    this.Wakeup();
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            if(messageQueue.Running)
+            {
+                messageQueue.Stop();
+                TaskRunner taskRunner = this.taskRunner;
+
+                if(taskRunner != null)
+                {
+                    this.taskRunner = null;
+                    taskRunner.Shutdown();
+                }
+            }
+        }
+
+        public void Stop(TimeSpan timeout)
+        {
+            if(messageQueue.Running)
+            {
+                messageQueue.Stop();
+                TaskRunner taskRunner = this.taskRunner;
+
+                if(taskRunner != null)
+                {
+                    this.taskRunner = null;
+                    taskRunner.ShutdownWithAbort(timeout);
+                }
+            }
+        }
+
+        public void Close()
+        {
+            this.messageQueue.Close();
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            try
+            {
+                MessageConsumer consumer = null;
+
+                lock(this.consumers.SyncRoot)
+                {
+                    if(this.consumers.Contains(dispatch.ConsumerId))
+                    {
+                        consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+                    }
+                }
+				
+                // If the consumer is not available, just ignore the message.
+                // Otherwise, dispatch the message to the consumer.
+                if(consumer != null)
+                {
+                    consumer.Dispatch(dispatch);
+                }
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+            }
+        }
+
+        public bool Iterate()
+        {
+            try
+            {
+                lock(this.consumers.SyncRoot)
+                {
+                    // Deliver any messages queued on the consumer to their listeners.
+                    foreach( MessageConsumer consumer in this.consumers.Values )
+                    {
+                        if(consumer.Iterate())
+                        {
+                            return true;
+                        }
+                    }
+                }
+
+                // No messages left queued on the listeners.. so now dispatch messages
+                // queued on the session
+                MessageDispatch message = messageQueue.DequeueNoWait();
+
+                if(message != null)
+                {
+                    this.Dispatch(message);
+                    return !messageQueue.Empty;
+                }
+
+                return false;
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+                this.session.Connection.OnSessionException(this.session, ex);
+            }
+
+            return true;
+        }
+
+        public void ClearMessagesInProgress()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public void Clear()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public MessageDispatch[] UnconsumedMessages
+        {
+            get{ return messageQueue.RemoveAll(); }
+        }
+
+        public bool HasUncomsumedMessages
+        {
+            get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; }
+        }
+
+        public bool Running
+        {
+            get{ return this.messageQueue.Running; }
+        }
+
+        public bool Empty
+        {
+            get{ return this.messageQueue.Empty; }
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+    /// <summary>
+    /// This class provides a wrapper around the ThreadPool mechanism in .NET
+    /// to allow for serial execution of jobs in the ThreadPool and provide
+    /// a means of shutting down the execution of jobs in a deterministic
+    /// way.
+    /// </summary>
+    public class ThreadPoolExecutor
+    {
+        private Queue<Future> workQueue = new Queue<Future>();
+        private Mutex syncRoot = new Mutex();
+        private bool running = false;
+        private bool closing = false;
+        private bool closed = false;
+        private ManualResetEvent executionComplete = new ManualResetEvent(true);
+        private Thread workThread = null;
+
+        /// <summary>
+        /// Represents an asynchronous task that is executed on the ThreadPool
+        /// at some point in the future.
+        /// </summary>
+        internal class Future
+        {
+            private readonly WaitCallback callback;
+            private readonly object callbackArg;
+
+            public Future(WaitCallback callback, object arg)
+            {
+                this.callback = callback;
+                this.callbackArg = arg;
+            }
+
+            public void Run()
+            {
+                if(this.callback == null)
+                {
+                    throw new Exception("Future executed with null WaitCallback");
+                }
+
+                try
+                {
+                    this.callback(callbackArg);
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker)
+        {
+            this.QueueUserWorkItem(worker, null);
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker, object arg)
+        {
+            if(worker == null)
+            {
+                throw new ArgumentNullException("Invalid WaitCallback passed");
+            }
+
+            if(!this.closed)
+            {
+                lock(syncRoot)
+                {
+                    if(!this.closed || !this.closing)
+                    {
+                        this.workQueue.Enqueue(new Future(worker, arg));
+
+                        if(!this.running)
+                        {
+                            this.executionComplete.Reset();
+                            this.running = true;
+                            ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+                        }
+                    }
+                }
+            }
+        }
+
+		/// <summary>
+		/// Returns true if this ThreadPoolExecutor has been shut down but has not 
+		/// finished running all the tasks that have been Queue.  When a ThreadPoolExecutor
+		/// is shut down it will not accept any new tasks but it will complete all tasks
+		/// that have been previously queued.
+		/// </summary>
+        public bool IsShutdown
+        {
+            get { return this.closing; }
+        }
+
+		/// <summary>
+		/// Returns true if this ThreadPoolExecutor has been shut down and has also
+		/// completed processing of all outstanding tasks in its task Queue.
+		/// </summary>
+        public bool IsTerminated
+        {
+            get { return this.closed; }
+        }
+
+        public void Shutdown()
+        {
+            if(!this.closed)
+            {
+				lock(this.syncRoot)
+				{
+	                if(!this.closed)
+	                {
+	                    this.closing = true;
+
+						// Must be no tasks in Queue and none can be accepted
+						// now that we've flipped the closing toggle so safe to
+						// mark this ThreadPoolExecutor as closed.
+						if (!this.running)
+						{
+							this.closed = true;
+							this.executionComplete.Set();
+						}
+	                }
+				}
+            }
+        }
+
+		public bool AwaitTermination(TimeSpan timeout) 
+		{
+            if(!this.closed)
+            {
+                syncRoot.WaitOne();
+
+                if(!this.closed)
+                {
+					// If called from the worker thread we can't check this as it 
+					// will deadlock us, just return whatever the closed state is.
+                    if(this.running && Thread.CurrentThread != this.workThread)
+                    {
+                        syncRoot.ReleaseMutex();
+                        this.closed = this.executionComplete.WaitOne(timeout, false);
+                        syncRoot.WaitOne();
+                    }
+                }
+
+                syncRoot.ReleaseMutex();
+            }
+
+			return this.closed;
+		}
+
+        private void QueueProcessor(object unused)
+        {
+            Future theTask = null;
+
+            lock(syncRoot)
+            {
+                this.workThread = Thread.CurrentThread;
+
+                if(this.workQueue.Count == 0)
+                {
+                    this.running = false;
+                    this.executionComplete.Set();
+                    return;
+                }
+
+                theTask = this.workQueue.Dequeue();
+            }
+
+            try
+            {
+                theTask.Run();
+            }
+            finally
+            {
+                this.workThread = null;
+
+                if(this.workQueue.Count == 0)
+                {
+            		lock(syncRoot)
+					{
+                    	this.running = false;
+                    	this.executionComplete.Set();
+					}
+                }
+                else
+                {
+                    ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+                }
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+	public delegate void CommandHandler(ITransport sender, Command command);
+	public delegate void ExceptionHandler(ITransport sender, Exception command);
+	public delegate void InterruptedHandler(ITransport sender);
+	public delegate void ResumedHandler(ITransport sender);
+
+	/// <summary>
+	/// Represents the logical networking transport layer.  Transports implment the low
+    /// level protocol specific portion of the Communication between the Client and a Broker
+    /// such as TCP, UDP, etc.  Transports make use of WireFormat objects to handle translateing
+    /// the cononical OpenWire Commands used in this client into binary wire level packets that
+    /// can be sent to the Broker or Service that the Transport connects to.
+	/// </summary>
+	public interface ITransport : IStartable, IDisposable, IStoppable
+	{
+        /// <summary>
+        /// Sends a Command object on the Wire but does not wait for any response from the
+        /// receiver before returning.  
+        /// </summary>
+        /// <param name="command">
+        /// A <see cref="Command"/>
+        /// </param>
+		void Oneway(Command command);
+
+        /// <summary>
+        /// Sends a Command object which requires a response from the Broker but does not
+        /// wait for the response, instead a FutureResponse object is returned that the
+        /// caller can use to wait on the Broker's response.
+        /// </summary>
+		FutureResponse AsyncRequest(Command command);
+
+        /// <summary>
+        /// Sends a Command to the Broker and waits for a Response to that Command before
+        /// returning, this version waits indefinitely for a response.
+        /// </summary>
+		Response Request(Command command);
+
+        /// <summary>
+        /// Sends a Command to the Broker and waits for the given TimeSpan to expire for a
+        /// response before returning.  
+        /// </summary>
+		Response Request(Command command, TimeSpan timeout);
+
+        /// <summary>
+        /// Allows a caller to find a specific type of Transport in the Chain of
+        /// Transports that is created.  This allows a caller to find a specific
+        /// object in the Transport chain and set or get properties on that specific
+        /// instance.  If the requested type isn't in the chain than Null is returned.
+        /// </summary>
+        Object Narrow(Type type);            
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending synchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		int Timeout
+		{
+			get;
+			set;
+		}
+
+		/// <summary>
+		/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+		/// Set to -1 for infinite timeout.
+		/// </summary>
+		int AsyncTimeout
+		{
+			get;
+			set;
+		}
+
+		CommandHandler Command
+		{
+			get;
+			set;
+		}
+
+		ExceptionHandler Exception
+		{
+			get;
+			set;
+		}
+
+		InterruptedHandler Interrupted
+		{
+			get;
+			set;
+		}		
+
+		ResumedHandler Resumed
+		{
+			get;
+			set;
+		}		
+
+        /// <value>
+        /// Indicates if this Transport has already been disposed and can no longer
+        /// be used.
+        /// </value>
+		bool IsDisposed
+		{
+			get;
+		}
+
+        /// <value>
+        /// Indicates if this Transport is Fault Tolerant or not.  A fault Tolerant
+        /// Transport handles low level connection errors internally allowing a client
+        /// to remain unaware of wire level disconnection and reconnection details.
+        /// </value>
+        bool IsFaultTolerant
+        {
+            get;
+        }
+
+        /// <value>
+        /// Indiciates if the Transport is current Connected to is assigned URI.
+        /// </value>
+        bool IsConnected
+        {
+            get;
+        }
+
+        /// <value>
+        /// The Remote Address that this transport is currently connected to.
+        /// </value>
+        Uri RemoteAddress
+        {
+            get;
+        }
+        
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Transport
+{
+	public delegate void SetTransport(ITransport transport, Uri uri);
+
+	public interface ITransportFactory
+	{
+		ITransport CreateTransport(Uri location);
+		ITransport CompositeConnect(Uri location);
+		ITransport CompositeConnect(Uri location, SetTransport setTransport);
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+    /// <summary>
+    /// A FIFO based MessageDispatchChannel.
+    /// </summary>
+    public class FifoMessageDispatchChannel : MessageDispatchChannel
+    {
+        private readonly Mutex mutex = new Mutex();
+        private bool closed;
+        private bool running;
+        private readonly LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+
+        #region Properties
+
+        public object SyncRoot
+        {
+            get{ return this.mutex; }
+        }
+        
+        public bool Closed
+        {
+            get 
+            {
+                lock(this.mutex)
+                {
+                    return this.closed; 
+                }
+            }
+            
+            set 
+            {
+                lock(this.mutex)
+                {
+                    this.closed = value;
+                }
+            }
+        }
+
+        public bool Running
+        {
+            get
+            {
+                lock(this.mutex)
+                {
+                    return this.running;
+                }
+            }
+            
+            set
+            {
+                lock(this.mutex)
+                {
+                    this.running = value;
+                }
+            }
+        }
+
+        public bool Empty
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count == 0;
+                }
+            }
+        }
+
+        public long Count
+        {
+            get
+            {
+                lock(mutex)
+                {
+                    return channel.Count;
+                }
+            }
+        }
+
+        #endregion
+
+        public void Start()
+        {
+            lock(this.mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            lock(mutex)
+            {
+                this.running = false;
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        public void Close()
+        {
+            lock(mutex)
+            {
+                if(!Closed)
+                {
+                    this.running = false;
+                    this.closed = true;
+                }          
+
+                Monitor.PulseAll(this.mutex);
+            }            
+        }
+        
+        public void Enqueue(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddLast(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public void EnqueueFirst(MessageDispatch dispatch)
+        {
+            lock(this.mutex)
+            {
+                this.channel.AddFirst(dispatch);
+                Monitor.Pulse(this.mutex);
+            }
+        }
+
+        public MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            lock(this.mutex)
+            {
+                // Wait until the channel is ready to deliver messages.
+                if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+        
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+        
+                return DequeueNoWait();                      
+            }
+        }
+
+        public MessageDispatch DequeueNoWait()
+        {
+            MessageDispatch result = null;
+            
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                result = channel.First.Value;
+                this.channel.RemoveFirst();
+            }
+
+            return result;
+        }
+
+        public MessageDispatch Peek()
+        {
+            lock(this.mutex)
+            {
+                if( Closed || !Running || Empty ) 
+                {
+                    return null;
+                }
+                
+                return channel.First.Value;
+            }
+        }
+
+        public void Clear()
+        {
+            lock(mutex)
+            {
+                this.channel.Clear();
+            }
+        }
+
+        public MessageDispatch[] RemoveAll()
+        {
+            MessageDispatch[] result;
+            
+            lock(mutex)
+            {
+                result = new MessageDispatch[this.Count];
+                channel.CopyTo(result, 0);
+                channel.Clear();
+            }
+
+            return result;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+    public class MQTTMessageTransformation : MessageTransformation
+    {
+        private readonly Connection connection;
+
+        public MQTTMessageTransformation(Connection connection) : base()
+        {
+            this.connection = connection;
+        }
+
+        #region Creation Methods and Conversion Support Methods
+
+        protected override IMessage DoCreateMessage()
+        {
+            throw new NotSupportedException("MQTT Cannot process Empty Messages");
+        }
+
+        protected override IBytesMessage DoCreateBytesMessage()
+        {
+			return null;
+        }
+
+        protected override ITextMessage DoCreateTextMessage()
+        {
+            throw new NotSupportedException("MQTT Cannot process Text Messages");
+        }
+
+        protected override IStreamMessage DoCreateStreamMessage()
+        {
+            throw new NotSupportedException("MQTT Cannot process Stream Messages");
+        }
+
+        protected override IMapMessage DoCreateMapMessage()
+        {
+            throw new NotSupportedException("MQTT Cannot process Map Messages");
+        }
+
+        protected override IObjectMessage DoCreateObjectMessage()
+        {
+            throw new NotSupportedException("MQTT Cannot process Object Messages");
+        }
+
+        protected override IDestination DoTransformDestination(IDestination destination)
+        {
+            return MQTTDestination.Transform(destination);
+        }
+
+        protected override void DoPostProcessMessage(IMessage message)
+        {
+        }
+
+        #endregion
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+    /// <summary>
+    /// Defines an interface for a Message Channel used to dispatch incoming
+    /// Messages to a Session or MessageConsumer.  The implementation controls
+    /// how the messages are dequeued from the channel, one option is for a
+    /// FIFO ordering while another might be to sort the Message's based on the
+    /// set Message Priority.
+    /// </summary>
+    public interface MessageDispatchChannel
+    {
+        object SyncRoot
+        {
+            get;
+        }
+        
+        bool Closed
+        {
+            get;
+            set;
+        }
+
+        bool Running
+        {
+            get;
+            set;
+        }
+
+        bool Empty
+        {
+            get;
+        }
+
+        long Count
+        {
+            get;
+        }
+
+        void Start();
+
+        void Stop();
+
+        void Close();
+        
+        void Enqueue(MessageDispatch dispatch);
+
+        void EnqueueFirst(MessageDispatch dispatch);
+
+        MessageDispatch Dequeue(TimeSpan timeout);
+
+        MessageDispatch DequeueNoWait();
+
+        MessageDispatch Peek();
+
+        void Clear();
+
+        MessageDispatch[] RemoveAll();
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
------------------------------------------------------------------------------
    svn:eol-style = native