You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/15 00:39:24 UTC

svn commit: r1542120 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/ src/main/csharp/Messages/ src/main/csharp/Threads/ src/main/csharp/Transport/

Author: tabish
Date: Thu Nov 14 23:39:24 2013
New Revision: 1542120

URL: http://svn.apache.org/r1542120
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Starting to implement the framework 

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs?rev=1542120&r1=1542119&r2=1542120&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs Thu Nov 14 23:39:24 2013
@@ -17,26 +17,22 @@
 
 using System;
 
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.MQTT
 {
-
 	/// <summary>
 	/// Exception thrown when an IO error occurs
 	/// </summary>
 	public class IOException : NMSException
 	{
-		public IOException()
-			: base("IO Exception failed with missing exception log")
+		public IOException() : base("IO Exception failed with missing exception log")
 		{
 		}
 
-		public IOException(String msg)
-			: base(msg)
+		public IOException(String msg) : base(msg)
 		{
 		}
 
-		public IOException(String msg, Exception inner)
-			: base(msg, inner)
+		public IOException(String msg, Exception inner) : base(msg, inner)
 		{
 		}
 	}

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Commands
+{
+	public class BytesMessage : MQTTMessage, IBytesMessage
+	{
+		private EndianBinaryReader dataIn = null;
+		private EndianBinaryWriter dataOut = null;
+		private MemoryStream outputBuffer = null;
+		private int length = 0;
+
+		public override Object Clone()
+		{
+			StoreContent();
+			return base.Clone();
+		}
+
+		public override void OnSend()
+		{
+			base.OnSend();
+			StoreContent();
+		}
+
+		public override void ClearBody()
+		{
+			base.ClearBody();
+			this.outputBuffer = null;
+			this.dataIn = null;
+			this.dataOut = null;
+			this.length = 0;
+		}
+
+		public long BodyLength
+		{
+			get
+			{
+				InitializeReading();
+				return this.length;
+			}
+		}
+
+		public byte ReadByte()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadByte();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteByte( byte value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public bool ReadBoolean()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadBoolean();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteBoolean( bool value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public char ReadChar()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadChar();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteChar( char value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public short ReadInt16()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadInt16();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteInt16( short value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public int ReadInt32()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadInt32();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteInt32( int value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public long ReadInt64()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadInt64();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteInt64( long value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public float ReadSingle()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadSingle();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteSingle( float value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public double ReadDouble()
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.ReadDouble();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteDouble( double value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public int ReadBytes( byte[] value )
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.Read( value, 0, value.Length );
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public int ReadBytes( byte[] value, int length )
+		{
+			InitializeReading();
+			try
+			{
+				return dataIn.Read( value, 0, length );
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteBytes( byte[] value )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value, 0, value.Length );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public void WriteBytes( byte[] value, int offset, int length )
+		{
+			InitializeWriting();
+			try
+			{
+				dataOut.Write( value, offset, length );
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public string ReadString()
+		{
+			InitializeReading();
+			try
+			{
+				// JMS, CMS and NMS all encode the String using a 16 bit size header.
+				return dataIn.ReadString16();
+			}
+			catch(EndOfStreamException e)
+			{
+				throw NMSExceptionSupport.CreateMessageEOFException(e);
+			}
+			catch(IOException e)
+			{
+				throw NMSExceptionSupport.CreateMessageFormatException(e);
+			}
+		}
+
+		public void WriteString( string value )
+		{
+			InitializeWriting();
+			try
+			{
+				// JMS, CMS and NMS all encode the String using a 16 bit size header.
+				dataOut.WriteString16(value);
+			}
+			catch(Exception e)
+			{
+				throw NMSExceptionSupport.Create(e);
+			}
+		}
+
+		public void WriteObject( System.Object value )
+		{
+			InitializeWriting();
+			if( value is System.Byte )
+			{
+				this.dataOut.Write( (byte) value );
+			}
+			else if( value is Char )
+			{
+				this.dataOut.Write( (char) value );
+			}
+			else if( value is Boolean )
+			{
+				this.dataOut.Write( (bool) value );
+			}
+			else if( value is Int16 )
+			{
+				this.dataOut.Write( (short) value );
+			}
+			else if( value is Int32 )
+			{
+				this.dataOut.Write( (int) value );
+			}
+			else if( value is Int64 )
+			{
+				this.dataOut.Write( (long) value );
+			}
+			else if( value is Single )
+			{
+				this.dataOut.Write( (float) value );
+			}
+			else if( value is Double )
+			{
+				this.dataOut.Write( (double) value );
+			}
+			else if( value is byte[] )
+			{
+				this.dataOut.Write( (byte[]) value );
+			}
+			else if( value is String )
+			{
+				this.dataOut.WriteString16( (string) value );
+			}
+			else
+			{
+				throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+			}
+		}
+
+        public new byte[] Content
+        {
+            get
+            {
+				byte[] buffer = null;
+                InitializeReading();
+				if(this.length != 0)
+				{
+                	buffer = new byte[this.length];
+                	this.dataIn.Read(buffer, 0, buffer.Length);
+				}
+				return buffer;
+            }
+
+            set
+            {
+                InitializeWriting();
+				if(null != value)
+				{
+					this.dataOut.Write(value, 0, value.Length);
+				}
+            }
+        }
+
+		public void Reset()
+		{
+			StoreContent();
+			this.dataIn = null;
+			this.dataOut = null;
+			this.outputBuffer = null;
+			this.ReadOnlyBody = true;
+		}
+
+		private void InitializeReading()
+		{
+			FailIfWriteOnlyBody();
+			if(this.dataIn == null)
+			{
+                byte[] data = base.Content;
+                
+				if(base.Content == null)
+				{
+					data = new byte[0];
+				}
+                
+                Stream target = new MemoryStream(data, false);
+                
+                if(this.Connection != null && this.Compressed == true)
+                {
+                    EndianBinaryReader reader = new EndianBinaryReader(target);
+                    this.length = reader.ReadInt32();
+                    
+                    target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
+                }
+                else
+                {
+                    this.length = data.Length;
+                }
+                
+				this.dataIn = new EndianBinaryReader(target);
+			}
+		}
+
+		private void InitializeWriting()
+		{
+			FailIfReadOnlyBody();
+			if(this.dataOut == null)
+			{
+				this.outputBuffer = new MemoryStream();
+                Stream target = this.outputBuffer;
+
+                if(this.Connection != null && this.Connection.UseCompression)
+                {
+                    this.length = 0;
+					this.Compressed = true;
+
+                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);                    
+                    target = new LengthTrackerStream(target, this);
+                }
+                
+				this.dataOut = new EndianBinaryWriter(target);
+			}
+		}
+
+		private void StoreContent()
+		{
+			if(this.dataOut != null)
+			{
+                if(this.Compressed == true)
+                {
+                    MemoryStream final = new MemoryStream();
+                    EndianBinaryWriter writer = new EndianBinaryWriter(final);                    
+
+                    this.dataOut.Close();
+
+                    writer.Write(this.length);
+					if(this.length > 0)
+					{
+						byte[] compressed = this.outputBuffer.ToArray();
+						writer.Write(compressed, 0, compressed.Length);
+					}
+                    writer.Close();
+                    
+                    base.Content = final.ToArray();
+                }
+                else
+                {
+                    this.dataOut.Close();
+                    base.Content = outputBuffer.ToArray();
+                }
+
+				this.dataOut = null;
+				this.outputBuffer = null;
+			}
+		}
+
+        /// <summary>
+        /// Used when the message compression is enabled to track how many bytes
+        /// the EndianBinaryWriter actually writes to the stream before compression
+        /// so that the receiving client can read off the real bodylength from the
+        /// Message before the data is actually read.
+        /// </summary>
+        private class LengthTrackerStream : Stream
+        {
+            private readonly ActiveMQBytesMessage parent;
+            private readonly Stream sink;
+
+            public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+            {
+                this.sink = sink;
+                this.parent = parent;
+            }
+
+            public override void Close()
+            {
+                this.sink.Close();
+                base.Close();
+            }
+
+            public override long Position
+            {
+                get { return this.sink.Position; }
+                set { this.sink.Position = value; }
+            }
+            
+            public override long Length
+            {
+                get { return this.sink.Length; }
+            }
+            
+            public override bool CanSeek
+            { 
+                get { return this.sink.CanSeek; } 
+            }
+            
+            public override bool CanRead 
+            { 
+                get { return this.sink.CanRead; } 
+            }
+
+            public override bool CanWrite
+            {
+                get { return this.sink.CanWrite; }
+            }
+
+            public override int ReadByte()
+            {
+                return this.sink.ReadByte();
+            }
+
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                return this.sink.Read(buffer, offset, count);
+            }
+            
+            public override void WriteByte(byte value)
+            {
+                this.parent.length++;
+                this.sink.WriteByte(value);
+            }
+            
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+				if(null != buffer)
+				{
+					this.parent.length += count;
+					this.sink.Write(buffer, offset, count);
+				}
+            }
+
+            public override void Flush()
+            {
+                this.sink.Flush();
+            }
+
+            public override long Seek(long offset, SeekOrigin origin)
+            {
+                return this.sink.Seek(offset, origin);
+            }
+
+            public override void SetLength(long value)
+            {
+                this.sink.SetLength(value);
+            }
+        }
+        
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Messages
+{
+	public delegate void AcknowledgeHandler(MQTTMessage message);
+
+	public class MQTTMessage : IMessage
+	{
+		private readonly PUBLISH publish = new PUBLISH();
+		private MessagePropertyIntercepter propertyHelper;
+		private PrimitiveMap properties;
+		private Connection connection;
+
+		public event AcknowledgeHandler Acknowledger;
+
+		public static MQTTMessage Transform(IMessage message)
+		{
+			return (MQTTMessage) message;
+		}
+
+		public MQTTMessage() : base()
+		{
+			Timestamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
+		}
+
+        public override int GetHashCode()
+        {
+            MessageId id = this.MessageId;
+
+            return id != null ? id.GetHashCode() : base.GetHashCode();
+        }
+
+	    public override byte GetDataStructureType()
+		{
+			return ID_ACTIVEMQMESSAGE;
+		}
+
+		public override object Clone()
+		{
+			MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
+
+			cloneMessage.propertyHelper = new MessagePropertyIntercepter(cloneMessage, cloneMessage.properties, this.ReadOnlyProperties) { AllowByteArrays = false };
+			return cloneMessage;
+		}
+
+        public override bool Equals(object that)
+        {
+            if(that is MQTTMessage)
+            {
+                return Equals((MQTTMessage) that);
+            }
+            return false;
+        }
+
+        public virtual bool Equals(MQTTMessage that)
+        {
+            MessageId oMsg = that.MessageId;
+            MessageId thisMsg = this.MessageId;
+            
+            return thisMsg != null && oMsg != null && oMsg.Equals(thisMsg);
+        }
+        
+		public void Acknowledge()
+		{
+		    if(null == Acknowledger)
+			{
+				throw new NMSException("No Acknowledger has been associated with this message: " + this);
+			}
+		    
+            Acknowledger(this);
+		}
+
+	    public virtual void ClearBody()
+		{
+			this.ReadOnlyBody = false;
+			this.Content = null;
+		}
+
+		public virtual void ClearProperties()
+		{
+			this.MarshalledProperties = null;
+			this.ReadOnlyProperties = false;
+			this.Properties.Clear();
+		}
+
+		protected void FailIfReadOnlyBody()
+		{
+			if(ReadOnlyBody == true)
+			{
+				throw new MessageNotWriteableException("Message is in Read-Only mode.");
+			}
+		}
+
+		protected void FailIfWriteOnlyBody()
+		{
+			if( ReadOnlyBody == false )
+			{
+				throw new MessageNotReadableException("Message is in Write-Only mode.");
+			}
+		}
+
+        public override bool ReadOnlyProperties
+        {
+            get{ return base.ReadOnlyProperties; }
+            
+            set
+            {
+                if(this.propertyHelper != null)
+                {
+                    this.propertyHelper.ReadOnly = value;
+                }
+                base.ReadOnlyProperties = value;
+            }
+        }
+        
+		#region Properties
+
+		public IPrimitiveMap Properties
+		{
+			get
+			{
+				if(null == properties)
+				{
+					properties = PrimitiveMap.Unmarshal(MarshalledProperties);
+					propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties)
+					                     {AllowByteArrays = false};
+					
+					// Since JMS doesn't define a Byte array interface for properties we
+					// disable them here to prevent sending invalid data to the broker.
+				}
+
+				return propertyHelper;
+			}
+		}
+
+		public IDestination FromDestination
+		{
+			get { return Destination; }
+			set { this.Destination = ActiveMQDestination.Transform(value); }
+		}
+		
+		public Connection Connection
+		{
+			get { return this.connection; }
+			set { this.connection = value; }
+		}
+
+		/// <summary>
+		/// The correlation ID used to correlate messages with conversations or long running business processes
+		/// </summary>
+		public string NMSCorrelationID
+		{
+			get { return CorrelationId; }
+			set { CorrelationId = value; }
+		}
+
+		/// <summary>
+		/// The destination of the message
+		/// </summary>
+		public IDestination NMSDestination
+		{
+			get { return Destination; }
+            set { Destination = value as ActiveMQDestination; }
+		}
+
+		private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
+		/// <summary>
+		/// The time in milliseconds that this message should expire in
+		/// </summary>
+		public TimeSpan NMSTimeToLive
+		{
+			get
+			{
+				if(Expiration > 0 && timeToLive.TotalMilliseconds <= 0.0)
+				{
+					timeToLive = TimeSpan.FromMilliseconds(Expiration - Timestamp);
+				}
+
+				return timeToLive;
+			}
+
+			set
+			{
+				timeToLive = value;
+				if(timeToLive.TotalMilliseconds > 0)
+				{
+					Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
+				}
+				else
+				{
+					Expiration = 0;
+				}
+			}
+		}
+
+		/// <summary>
+		/// The message ID which is set by the provider
+		/// </summary>
+		public string NMSMessageId
+		{
+			get
+			{
+			    return null != MessageId ? BaseDataStreamMarshaller.ToString(MessageId) : String.Empty;
+			}
+
+		    set
+            {
+                if(value != null) 
+                {
+                    try 
+                    {
+                        MessageId id = new MessageId(value);
+                        this.MessageId = id;
+                    } 
+                    catch(FormatException) 
+                    {
+                        // we must be some foreign JMS provider or strange user-supplied
+                        // String so lets set the IDs to be 1
+                        MessageId id = new MessageId();
+                        this.MessageId = id;
+                    }
+                } 
+                else
+                {
+                    this.MessageId = null;
+                }
+            }
+		}
+
+		/// <summary>
+		/// Whether or not this message is persistent
+		/// </summary>
+		public MsgDeliveryMode NMSDeliveryMode
+		{
+			get { return (Persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
+			set { Persistent = (MsgDeliveryMode.Persistent == value); }
+		}
+
+		/// <summary>
+		/// The Priority on this message
+		/// </summary>
+		public MsgPriority NMSPriority
+		{
+			get { return (MsgPriority) Priority; }
+			set { Priority = (byte) value; }
+		}
+
+		/// <summary>
+		/// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
+		/// </summary>
+		public bool NMSRedelivered
+		{
+			get { return (RedeliveryCounter > 0); }
+
+            set
+            {
+                if(value == true)
+                {
+                    if(this.RedeliveryCounter <= 0)
+                    {
+                        this.RedeliveryCounter = 1;
+                    }
+                }
+                else
+                {
+                    if(this.RedeliveryCounter > 0)
+                    {
+                        this.RedeliveryCounter = 0;
+                    }
+                }
+            }
+		}
+
+		/// <summary>
+		/// The destination that the consumer of this message should send replies to
+		/// </summary>
+		public IDestination NMSReplyTo
+		{
+			get { return ReplyTo; }
+			set { ReplyTo = ActiveMQDestination.Transform(value); }
+		}
+
+		/// <summary>
+		/// The timestamp the broker added to the message
+		/// </summary>
+		public DateTime NMSTimestamp
+		{
+			get { return DateUtils.ToDateTime(Timestamp); }
+			set
+			{
+				Timestamp = DateUtils.ToJavaTimeUtc(value);
+				if(timeToLive.TotalMilliseconds > 0)
+				{
+					Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
+				}
+			}
+		}
+
+		/// <summary>
+		/// The type name of this message
+		/// </summary>
+		public string NMSType
+		{
+			get { return Type; }
+			set { Type = value; }
+		}
+
+		#endregion
+
+		#region NMS Extension headers
+
+		/// <summary>
+		/// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully.
+		/// </summary>
+		public int NMSXDeliveryCount
+		{
+			get { return RedeliveryCounter + 1; }
+		}
+
+		/// <summary>
+		/// The Message Group ID used to group messages together to the same consumer for the same group ID value
+		/// </summary>
+		public string NMSXGroupID
+		{
+			get { return GroupID; }
+			set { GroupID = value; }
+		}
+		/// <summary>
+		/// The Message Group Sequence counter to indicate the position in a group
+		/// </summary>
+		public int NMSXGroupSeq
+		{
+			get { return GroupSequence; }
+			set { GroupSequence = value; }
+		}
+
+		/// <summary>
+		/// Returns the ID of the producers transaction
+		/// </summary>
+		public string NMSXProducerTXID
+		{
+			get
+			{
+				TransactionId txnId = OriginalTransactionId;
+				if(null == txnId)
+				{
+					txnId = TransactionId;
+				}
+
+				if(null != txnId)
+				{
+					return BaseDataStreamMarshaller.ToString(txnId);
+				}
+
+				return String.Empty;
+			}
+		}
+
+		#endregion
+
+		public object GetObjectProperty(string name)
+		{
+			return Properties[name];
+		}
+
+		public void SetObjectProperty(string name, object value)
+		{
+			Properties[name] = value;
+		}
+
+		// MarshallAware interface
+		public override bool IsMarshallAware()
+		{
+			return true;
+		}
+
+		public override void BeforeMarshall(OpenWireFormat wireFormat)
+		{
+			MarshalledProperties = null;
+			if(properties != null)
+			{
+				MarshalledProperties = properties.Marshal();
+			}
+		}
+
+		public override Response Visit(ICommandVisitor visitor)
+		{
+			return visitor.ProcessMessage(this);
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Messages
+{
+    /*
+     *
+     *  Command code for OpenWire format for MessageDispatch
+     *
+     *  NOTE!: This file is auto generated - do not modify!
+     *         if you need to make a change, please see the Java Classes
+     *         in the nms-activemq-openwire-generator module
+     *
+     */
+    public class MessageDispatch : BaseCommand
+    {
+        ConsumerId consumerId;
+        ActiveMQDestination destination;
+        Message message;
+        int redeliveryCounter;
+
+        ///
+        /// <summery>
+        ///  Returns a string containing the information for this DataStructure
+        ///  such as its type and value of its elements.
+        /// </summery>
+        ///
+        public override string ToString()
+        {
+            return GetType().Name + "[ " + 
+                "commandId = " + this.CommandId + ", " + 
+                "responseRequired = " + this.ResponseRequired + ", " + 
+                "ConsumerId = " + ConsumerId + ", " + 
+                "Destination = " + Destination + ", " + 
+                "Message = " + Message + ", " + 
+                "RedeliveryCounter = " + RedeliveryCounter + " ]";
+        }
+
+        public Exception RollbackCause
+        {
+            get { return this.rollbackCause; }
+            set { this.rollbackCause = value; }
+        }
+
+        public ConsumerId ConsumerId
+        {
+            get { return consumerId; }
+            set { this.consumerId = value; }
+        }
+
+        public ActiveMQDestination Destination
+        {
+            get { return destination; }
+            set { this.destination = value; }
+        }
+
+        public Message Message
+        {
+            get { return message; }
+            set { this.message = value; }
+        }
+
+        public int RedeliveryCounter
+        {
+            get { return redeliveryCounter; }
+            set { this.redeliveryCounter = value; }
+        }
+
+        public override int GetHashCode()
+        {
+            int answer = 0;
+
+            answer = (answer * 37) + HashCode(ConsumerId);
+            answer = (answer * 37) + HashCode(Destination);
+            answer = (answer * 37) + HashCode(Message);
+            answer = (answer * 37) + HashCode(RedeliveryCounter);
+
+            return answer;
+        }
+
+        public override bool Equals(object that)
+        {
+            if(that is MessageDispatch)
+            {
+                return Equals((MessageDispatch) that);
+            }
+
+            return false;
+        }
+
+        public virtual bool Equals(MessageDispatch that)
+        {
+            if(!Equals(this.ConsumerId, that.ConsumerId))
+            {
+                return false;
+            }
+            if(!Equals(this.Destination, that.Destination))
+            {
+                return false;
+            }
+            if(!Equals(this.Message, that.Message))
+            {
+                return false;
+            }
+            if(!Equals(this.RedeliveryCounter, that.RedeliveryCounter))
+            {
+                return false;
+            }
+
+            return true;
+        }
+        ///
+        /// <summery>
+        ///  Return an answer of true to the isMessageDispatch() query.
+        /// </summery>
+        ///
+        public override bool IsMessageDispatch
+        {
+            get { return true; }
+        }
+    };
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+    /// <summary>
+    /// A TaskRunner that dedicates a single thread to running a single Task.
+    /// </summary>
+    public class DedicatedTaskRunner : TaskRunner
+    {
+        private readonly Mutex mutex = new Mutex();
+        private readonly Thread theThread = null;
+        private readonly Task task = null;
+
+        private bool terminated = false;
+        private bool pending = false;
+        private bool shutdown = false;
+
+        public DedicatedTaskRunner(Task task)
+            : this(task, "MQTT Task", ThreadPriority.Normal)
+        {
+        }
+
+        public DedicatedTaskRunner(Task task, string taskName, ThreadPriority taskPriority)
+        {
+            if(task == null)
+            {
+                throw new NullReferenceException("Task was null");
+            }
+
+            this.task = task;
+
+            this.theThread = new Thread(Run);
+            this.theThread.IsBackground = true;
+            this.theThread.Priority = taskPriority;
+            this.theThread.Name = taskName;
+            this.theThread.Start();
+        }
+
+        public void Shutdown(TimeSpan timeout)
+        {
+            lock(mutex)
+            {
+                this.shutdown = true;
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
+                {
+                    Monitor.Wait(this.mutex, timeout);
+                }
+            }
+        }
+
+        public void Shutdown()
+        {
+            this.Shutdown(TimeSpan.FromMilliseconds(-1));
+        }
+
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(mutex)
+            {
+                this.shutdown = true;
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
+                {
+                    Monitor.Wait(this.mutex, timeout);
+
+                    if(!this.terminated)
+                    {
+                        theThread.Abort();
+                    }
+                }
+            }
+        }
+
+        public void Wakeup()
+        {
+            lock(mutex)
+            {
+                if(this.shutdown)
+                {
+                    return;
+                }
+
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
+        internal void Run()
+        {
+            try
+            {
+                while(true)
+                {
+                    lock(this.mutex)
+                    {
+                        pending = false;
+                        if(this.shutdown)
+                        {
+                            return;
+                        }
+                    }
+
+                    if(!this.task.Iterate())
+                    {
+                        // wait to be notified.
+                        lock(this.mutex)
+                        {
+                            if(this.shutdown)
+                            {
+                                return;
+                            }
+
+                            while(!this.pending)
+                            {
+                                Monitor.Wait(this.mutex);
+                            }
+                        }
+                    }
+                }
+            }
+            catch(ThreadAbortException)
+            {
+                // Prevent the ThreadAbortedException for propogating.
+                Thread.ResetAbort();
+            }
+            catch
+            {
+            }
+            finally
+            {
+                // Make sure we notify any waiting threads that thread
+                // has terminated.
+                lock(this.mutex)
+                {
+                    this.terminated = true;
+                    Monitor.PulseAll(this.mutex);
+                }
+            }
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	/// <summary>
+	/// Scheduler Service useful for running various delayed units of work.
+	/// </summary>
+	public class Scheduler
+	{
+		private object syncRoot = new object();
+    	private readonly String name;
+    	private TimerEx timer;
+    	private readonly Dictionary<Object, TimerTask> timerTasks = new Dictionary<object, TimerTask>();
+		private bool started = false;
+
+		public Scheduler(String name)
+		{
+			this.name = name;
+		}
+
+		/// <summary>
+		/// Executes the given task periodically using a fixed-delay execution style 
+		/// which prevents tasks from bunching should there be some delay such as
+		/// garbage collection or machine sleep.  
+		/// 
+		/// This repeating unit of work can later be cancelled using the WaitCallback
+		/// that was originally used to initiate the processing.
+		/// </summary>
+	    public void ExecutePeriodically(WaitCallback task, object arg, int period) 
+		{
+			lock (this.syncRoot)
+			{
+				CheckStarted();
+	        	TimerTask timerTask = timer.Schedule(task, arg, period, period);
+	        	timerTasks.Add(task, timerTask);
+			}
+	    }
+
+		/// <summary>
+		/// Executes the given task periodically using a fixed-delay execution style 
+		/// which prevents tasks from bunching should there be some delay such as
+		/// garbage collection or machine sleep.  
+		/// 
+		/// This repeating unit of work can later be cancelled using the WaitCallback
+		/// that was originally used to initiate the processing.
+		/// </summary>
+	    public void ExecutePeriodically(WaitCallback task, object arg, TimeSpan period) 
+		{
+			lock (this.syncRoot)
+			{
+				CheckStarted();
+		        TimerTask timerTask = timer.Schedule(task, arg, period, period);
+		        timerTasks.Add(task, timerTask);
+			}
+	    }
+
+		/// <summary>
+		/// Executes the given task the after delay, no reference is kept for this
+		/// task so it cannot be cancelled later.  
+		/// </summary>
+	    public void ExecuteAfterDelay(WaitCallback task, object arg, int delay) 
+		{
+			lock (this.syncRoot)
+			{
+				CheckStarted();
+			}
+
+			timer.Schedule(task, arg, delay);
+	    }
+
+		/// <summary>
+		/// Executes the given task the after delay, no reference is kept for this
+		/// task so it cannot be cancelled later.  
+		/// </summary>
+	    public void ExecuteAfterDelay(WaitCallback task, object arg, TimeSpan delay) 
+		{
+			lock (this.syncRoot)
+			{
+				CheckStarted();
+			}
+
+			timer.Schedule(task, arg, delay);
+	    }
+
+	    public void Cancel(object task) 
+		{
+			lock (this.syncRoot)
+			{
+				if (timerTasks.ContainsKey(task))
+				{
+	        		TimerTask ticket = timerTasks[task];
+	        		if (ticket != null) 
+					{
+			            ticket.Cancel();
+	            		timer.Purge(); // remove cancelled TimerTasks
+	        		}
+
+					timerTasks.Remove(task);
+				}
+			}
+	    }
+
+	    public void Start()
+		{
+			lock (this.syncRoot)
+			{
+	        	this.timer = new TimerEx(name, true);
+				this.started = true;
+			}
+	    }
+
+	    public void Stop() 
+		{
+			lock (this.syncRoot)
+			{
+				this.started = false;
+	        	if (this.timer != null)
+            	{
+	            	this.timer.Cancel();
+	        	}
+			}
+	    }
+
+		public String Name
+		{
+			get { return this.name; }
+		}
+
+		public bool Started
+		{
+			get 
+			{ 
+				lock (this.syncRoot)
+				{
+					return this.started; 
+				}
+			}
+		}
+
+	    public override string ToString()
+		{
+			return string.Format("[Scheduler][{0}]", name);
+		}
+
+		private void CheckStarted()
+		{
+			if (!this.started)
+			{
+				throw new InvalidOperationException("The Schedular has not been started yet");
+			}
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MQTT
+{
+	/// <summary>
+	/// Represents a task that may take a few iterations to complete.
+	/// </summary>
+	public interface Task
+	{
+		/// <summary>
+		/// Performs some portion of the work that this Task object is
+		/// assigned to complete.  When the task is entirely finished this
+		/// method should return false. 
+		/// </summary>
+		/// <returns>
+		/// A <see cref="System.Boolean"/> this indicates if this Task should 
+		/// be run again.
+		/// </returns>
+		bool Iterate();
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	/// <summary>
+	/// Allows you to request a thread execute the associated Task.
+	/// </summary>
+	public interface TaskRunner
+	{
+        /// <summary>
+        /// Wakeup the TaskRunner and have it check for any pending work that
+        /// needs to be completed.  If none is found it will go back to sleep
+        /// until another Wakeup call is made.
+        /// </summary>
+		void Wakeup();
+
+        /// <summary>
+        /// Attempt to Shutdown the TaskRunner, this method will wait indefinitely
+        /// for the TaskRunner to quite if the task runner is in a call to its Task's
+        /// run method and that never returns.
+        /// </summary>
+		void Shutdown();
+
+        /// <summary>
+        /// Performs a timed wait for the TaskRunner to shutdown.  If the TaskRunner
+        /// is in a call to its Task's run method and that does not return before the
+        /// timeout expires this method returns and the TaskRunner may remain in the
+        /// running state.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="TimeSpan"/>
+        /// </param>
+		void Shutdown(TimeSpan timeout);
+
+        /// <summary>
+        /// Performs a timed wait for the TaskRunner to shutdown.  If the TaskRunner
+        /// is in a call to its Task's run method and that does not return before the
+        /// timeout expires this method sends an Abort to the Task thread and return.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="TimeSpan"/>
+        /// </param>
+        void ShutdownWithAbort(TimeSpan timeout);
+
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	/// <summary>
+	/// Manages the thread pool for long running tasks. Long running tasks are not
+	/// always active but when they are active, they may need a few iterations of
+	/// processing for them to become idle. The manager ensures that each task is
+	/// processes but that no one task overtakes the system. This is kina like
+	/// cooperative multitasking.
+    ///
+    /// If your OS/JVM combination has a good thread model, you may want to avoid
+    /// using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+    /// </summary>
+	public class TaskRunnerFactory
+	{
+		public string name = "MQTT Task";
+        public ThreadPriority priority = ThreadPriority.Normal;
+        public int maxIterationsPerRun = 1000;
+        public bool dedicatedTaskRunner = true;
+
+		public TaskRunnerFactory()
+		{
+		}
+
+        public TaskRunner CreateTaskRunner(Task task)
+        {
+            return CreateTaskRunner(task, this.name);
+        }
+
+        public TaskRunner CreateTaskRunner(Task task, string name)
+        {
+            return CreateTaskRunner(task, name, this.priority);
+        }
+
+		public TaskRunner CreateTaskRunner(Task task, string name, ThreadPriority taskPriority)
+		{
+            if(this.dedicatedTaskRunner)
+            {
+                return new DedicatedTaskRunner(task, name, taskPriority);
+            }
+            else
+            {
+                return new PooledTaskRunner(task, this.maxIterationsPerRun);
+            }
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.MQTT.Transport
+{
+    /// <summary>
+    /// An Transport Command Marker Interface
+    /// </summary>
+    public interface Command : ICloneable
+    {
+        int CommandId
+        {
+            get;
+            set;
+        }
+
+        bool ResponseRequired
+        {
+            get;
+            set;
+        }
+
+        bool IsDestinationInfo
+        {
+            get;
+        }
+
+        bool IsKeepAliveInfo
+        {
+            get;
+        }
+
+        bool IsMessage
+        {
+            get;
+        }
+
+        bool IsMessageAck
+        {
+            get;
+        }
+
+        bool IsMessageDispatch
+        {
+            get;
+        }
+
+        bool IsProducerInfo
+        {
+            get;
+        }
+
+        bool IsResponse
+        {
+            get;
+        }
+
+        bool IsSessionInfo
+        {
+            get;
+        }
+
+        bool IsShutdownInfo
+        {
+            get;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1542120&r1=1542119&r2=1542120&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Thu Nov 14 23:39:24 2013
@@ -39,23 +39,46 @@
   <ItemGroup>
     <Compile Include="src\main\csharp\ConnectionFactory.cs" />
     <Compile Include="src\main\csharp\ConnectionMetaData.cs" />
-    <Compile Include="src\main\csharp\commands\CONNECT.cs" />
-    <Compile Include="src\main\csharp\commands\CONNACK.cs" />
-    <Compile Include="src\main\csharp\commands\PUBLISH.cs" />
-    <Compile Include="src\main\csharp\commands\PUBACK.cs" />
-    <Compile Include="src\main\csharp\commands\PUBREC.cs" />
-    <Compile Include="src\main\csharp\commands\PUBREL.cs" />
-    <Compile Include="src\main\csharp\commands\SUBSCRIBE.cs" />
-    <Compile Include="src\main\csharp\commands\SUBACK.cs" />
-    <Compile Include="src\main\csharp\commands\UNSUBSCRIBE.cs" />
-    <Compile Include="src\main\csharp\commands\UNSUBACK.cs" />
-    <Compile Include="src\main\csharp\commands\PINGREQ.cs" />
-    <Compile Include="src\main\csharp\commands\PINGRESP.cs" />
-    <Compile Include="src\main\csharp\commands\DISCONNECT.cs" />
+    <Compile Include="src\main\csharp\Connection.cs" />
+    <Compile Include="src\main\csharp\CommonAssemblyInfo.cs" />
+    <Compile Include="src\main\csharp\Session.cs" />
+    <Compile Include="src\main\csharp\MessageConsumer.cs" />
+    <Compile Include="src\main\csharp\MessageProducer.cs" />
+    <Compile Include="src\main\csharp\Commands\CONNACK.cs" />
+    <Compile Include="src\main\csharp\Commands\CONNECT.cs" />
+    <Compile Include="src\main\csharp\Commands\DISCONNECT.cs" />
+    <Compile Include="src\main\csharp\Commands\PINGREQ.cs" />
+    <Compile Include="src\main\csharp\Commands\PINGRESP.cs" />
+    <Compile Include="src\main\csharp\Commands\PUBACK.cs" />
+    <Compile Include="src\main\csharp\Commands\PUBLISH.cs" />
+    <Compile Include="src\main\csharp\Commands\PUBREC.cs" />
+    <Compile Include="src\main\csharp\Commands\PUBREL.cs" />
+    <Compile Include="src\main\csharp\Commands\SUBACK.cs" />
+    <Compile Include="src\main\csharp\Commands\SUBSCRIBE.cs" />
+    <Compile Include="src\main\csharp\Commands\UNSUBACK.cs" />
+    <Compile Include="src\main\csharp\Commands\UNSUBSCRIBE.cs" />
+    <Compile Include="src\main\csharp\Threads\ThreadPoolExecutor.cs" />
+    <Compile Include="src\main\csharp\Transport\ITransport.cs" />
+    <Compile Include="src\main\csharp\Transport\ITransportFactory.cs" />
+    <Compile Include="src\main\csharp\Util\MQTTMessageTransformation.cs" />
+    <Compile Include="src\main\csharp\ConnectionClosedException.cs" />
+    <Compile Include="src\main\csharp\IDispatcher.cs" />
+    <Compile Include="src\main\csharp\IOException.cs" />
+    <Compile Include="src\main\csharp\SessionExecutor.cs" />
+    <Compile Include="src\main\csharp\Util\FifoMessageDispatchChannel.cs" />
+    <Compile Include="src\main\csharp\Util\MessageDispatchChannel.cs" />
+    <Compile Include="src\main\csharp\Threads\DedicatedTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\Scheduler.cs" />
+    <Compile Include="src\main\csharp\Threads\Task.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
+    <Compile Include="src\main\csharp\Messages\MessageDispatch.cs" />
+    <Compile Include="src\main\csharp\Messages\BytesMessage.cs" />
+    <Compile Include="src\main\csharp\Messages\MQTTMessage.cs" />
+    <Compile Include="src\main\csharp\Transport\Command.cs" />
   </ItemGroup>
   <ItemGroup>
     <Folder Include="keyfile\" />
-    <Folder Include="src\main\csharp\commands\" />
   </ItemGroup>
   <ItemGroup>
     <None Include="LICENSE.txt" />