You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/16 00:43:14 UTC

svn commit: r1542430 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Messages/ src/main/csharp/Threads/ src/main/csharp/Transport/ src/main/csharp/Util/

Author: tabish
Date: Fri Nov 15 23:43:13 2013
New Revision: 1542430

URL: http://svn.apache.org/r1542430
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/FutureResponse.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MQTTTransportFactoryAttribute.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Response.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFactory.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FactoryAttribute.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FactoryFinder.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/IdGenerator.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTDestination.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Nov 15 23:43:13 2013
@@ -2,3 +2,4 @@
 build
 lib
 vs2008-mqtt.userprefs
+bin

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs Fri Nov 15 23:43:13 2013
@@ -16,7 +16,7 @@
 //
 using System;
 
-namespace Apache.NMS.MQTT.Command
+namespace Apache.NMS.MQTT.Commands
 {
 	public class CONNACK
 	{
@@ -32,6 +32,17 @@ namespace Apache.NMS.MQTT.Command
 		public CONNACK()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "CONNACK"; }
+		}
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
 		public const byte TYPE = 1;
 		public const String PROTOCOL_NAME = "MQIsdp";
 
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "CONNECT"; }
+		}
+
 		private byte version = 3;
 		public byte Version
 		{
@@ -66,6 +76,13 @@ namespace Apache.NMS.MQTT.Commands
 			set { this.password = value; }
 		}
 
+		private String clientId;
+		public String ClientId
+		{
+			get { return this.clientId; }
+			set { this.clientId = value; }
+		}
+
 		private bool cleanSession;
 		public bool CleanSession
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public DISCONNECT()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "DISCONNECT"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,17 @@ namespace Apache.NMS.MQTT.Commands
 		public PINGREQ()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PINGREQ"; }
+		}
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public PINGRESP()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PINGRESP"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public PUBACK()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PUBACK"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 3;
 
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PUBLISH"; }
+		}
+
 		private byte qosLevel;
 		public byte QoSLevel
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public PUBREC()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PUBREC"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public PUBREL()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "PUBREL"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
 		public SUBACK()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "SUBACK"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs Fri Nov 15 23:43:13 2013
@@ -29,6 +29,16 @@ namespace Apache.NMS.MQTT.Commands
 		public SUBSCRIBE()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "SUBSCRIBE"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public UNSUBACK()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "UNSUBACK"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
 		public UNSUBSCRIBE()
 		{
 		}
+
+		public int CommandType
+		{
+			get { return TYPE; }
+		}
+
+		public int CommandName
+		{
+			get { return "UNSUBSCRIBE"; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Fri Nov 15 23:43:13 2013
@@ -15,13 +15,23 @@
 // limitations under the License.
 //
 using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.Util;
 using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Threads;
 using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
 
 namespace Apache.NMS.MQTT
 {
 	public class Connection : IConnection
 	{
+		private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+		private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite);
+
+		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 		private readonly CONNECT info = null;
 		private ITransport transport;
 		private readonly Uri brokerUri;
@@ -32,15 +42,19 @@ namespace Apache.NMS.MQTT
         private readonly Atomic<bool> closed = new Atomic<bool>(false);
         private readonly Atomic<bool> closing = new Atomic<bool>(false);
         private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+		private readonly object connectedLock = new object();
         private Exception firstFailureError = null;
+		private bool userSpecifiedClientID;
         private int sessionCounter = 0;
         private readonly Atomic<bool> started = new Atomic<bool>(false);
         private ConnectionMetaData metaData = null;
         private bool disposed = false;
+		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; // from connection factory
         private readonly MessageTransformation messageTransformation;
         private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+		private readonly IdGenerator clientIdGenerator;
 
-		public Connection(Uri connectionUri, ITransport transport)
+		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
 			this.brokerUri = connectionUri;
 			this.clientIdGenerator = clientIdGenerator;
@@ -100,6 +114,34 @@ namespace Apache.NMS.MQTT
 			set { this.info.Password = value; }
 		}
 
+        public string ClientId
+        {
+            get { return info.ClientId; }
+            set
+            {
+                if(this.connected.Value)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+
+                this.info.ClientId = value;
+                this.userSpecifiedClientID = true;
+                CheckConnected();
+            }
+        }
+
+		/// <summary>
+		/// The Default Client Id used if the ClientId property is not set explicity.
+		/// </summary>
+		public string DefaultClientId
+		{
+			set
+			{
+				this.info.ClientId = value;
+				this.userSpecifiedClientID = true;
+			}
+		}
+
 		/// <summary>
 		/// This property sets the acknowledgment mode for the connection.
 		/// The URI parameter connection.ackmode can be set to a string value
@@ -229,7 +271,7 @@ namespace Apache.NMS.MQTT
 		protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
 		{
 			CheckConnected();
-			return new Session(this, NextSessionId, ackMode);
+			return new Session(this, ackMode);
 		}
 
 		internal void AddSession(Session session)
@@ -249,41 +291,40 @@ namespace Apache.NMS.MQTT
 			}
 		}
 
-		internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
-		{
-			if(!this.closing.Value)
-			{
-				this.dispatchers.Add(id, dispatcher);
-			}
-		}
-
-		internal void RemoveDispatcher(ConsumerId id)
-		{
-			if(!this.closing.Value)
-			{
-				this.dispatchers.Remove(id);
-			}
-		}
-
-		internal void AddProducer(ProducerId id, MessageProducer producer)
-		{
-			if(!this.closing.Value)
-			{
-				this.producers.Add(id, producer);
-			}
-		}
-
-		internal void RemoveProducer(ProducerId id)
-		{
-			if(!this.closing.Value)
-			{
-				this.producers.Remove(id);
-			}
-		}
+//		internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
+//		{
+//			if(!this.closing.Value)
+//			{
+//				this.dispatchers.Add(id, dispatcher);
+//			}
+//		}
+//
+//		internal void RemoveDispatcher(ConsumerId id)
+//		{
+//			if(!this.closing.Value)
+//			{
+//				this.dispatchers.Remove(id);
+//			}
+//		}
+//
+//		internal void AddProducer(ProducerId id, MessageProducer producer)
+//		{
+//			if(!this.closing.Value)
+//			{
+//				this.producers.Add(id, producer);
+//			}
+//		}
+//
+//		internal void RemoveProducer(ProducerId id)
+//		{
+//			if(!this.closing.Value)
+//			{
+//				this.producers.Remove(id);
+//			}
+//		}
 
 	    internal void RemoveDispatcher(IDispatcher dispatcher) 
 		{
-	        this.connectionAudit.RemoveDispatcher(dispatcher);
 	    }
 
 		public void Close()
@@ -316,6 +357,22 @@ namespace Apache.NMS.MQTT
 			disposed = true;
 		}
 
+		protected void OnCommand(ITransport commandTransport, Command command)
+		{
+		}
+
+		internal void OnTransportException(ITransport source, Exception cause)
+		{
+		}
+
+		protected void OnTransportInterrupted(ITransport sender)
+		{
+		}
+
+		protected void OnTransportResumed(ITransport sender)
+		{
+		}
+
 		protected void CheckClosedOrFailed()
 		{
 			CheckClosed();
@@ -333,6 +390,106 @@ namespace Apache.NMS.MQTT
 			}
 		}
 
+		/// <summary>
+		/// Check and ensure that the connection object is connected.  If it is not
+		/// connected or is closed or closing, a ConnectionClosedException is thrown.
+		/// </summary>
+		internal void CheckConnected()
+		{
+			if(closed.Value)
+			{
+				throw new ConnectionClosedException();
+			}
+
+			if(!connected.Value)
+			{
+				DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+				int waitCount = 1;
+
+				while(true)
+				{
+					if(Monitor.TryEnter(connectedLock))
+					{
+						try
+						{
+							if(closed.Value || closing.Value)
+							{
+								break;
+							}
+							else if(!connected.Value)
+							{
+								if(!this.userSpecifiedClientID)
+								{
+									this.info.ClientId = this.clientIdGenerator.GenerateId();
+								}
+
+								try
+								{
+									if(null != transport)
+									{
+										// Make sure the transport is started.
+										if(!this.transport.IsStarted)
+										{
+											this.transport.Start();
+										}
+
+										// Send the connection and see if an ack/nak is returned.
+										Response response = transport.Request(this.info, this.RequestTimeout);
+										if(!(response is ExceptionResponse))
+										{
+											connected.Value = true;
+										}
+										else
+										{
+											ExceptionResponse error = response as ExceptionResponse;
+											NMSException exception = CreateExceptionFromBrokerError(error.Exception);
+											if(exception is InvalidClientIDException)
+											{
+												// This is non-recoverable.
+												// Shutdown the transport connection, and re-create it, but don't start it.
+												// It will be started if the connection is re-attempted.
+												this.transport.Stop();
+												ITransport newTransport = TransportFactory.CreateTransport(this.brokerUri);
+												SetTransport(newTransport);
+												throw exception;
+											}
+										}
+									}
+								}
+								catch(BrokerException)
+								{
+									// We Swallow the generic version and throw ConnectionClosedException
+								}
+								catch(NMSException)
+								{
+									throw;
+								}
+							}
+						}
+						finally
+						{
+							Monitor.Exit(connectedLock);
+						}
+					}
+
+					if(connected.Value || closed.Value || closing.Value
+						|| (DateTime.Now > timeoutTime && this.RequestTimeout != InfiniteTimeSpan))
+					{
+						break;
+					}
+
+					// Back off from being overly aggressive.  Having too many threads
+					// aggressively trying to connect to a down broker pegs the CPU.
+					Thread.Sleep(5 * (waitCount++));
+				}
+
+				if(!connected.Value)
+				{
+					throw new ConnectionClosedException();
+				}
+			}
+		}
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs Fri Nov 15 23:43:13 2013
@@ -19,6 +19,8 @@ using System;
 using System.Collections.Specialized;
 using Apache.NMS.Util;
 using Apache.NMS.Policies;
+using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Util;
 
 namespace Apache.NMS.MQTT
 {
@@ -36,7 +38,7 @@ namespace Apache.NMS.MQTT
         private string connectionPassword;
         private string clientId;
         private string clientIdPrefix;
-        //private IdGenerator clientIdGenerator;
+        private IdGenerator clientIdGenerator;
 
         private bool copyMessageOnSend = true;
         private bool asyncSend;
@@ -159,23 +161,17 @@ namespace Apache.NMS.MQTT
                     StringDictionary properties = URISupport.ParseQuery(query);
                 
                     StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
-                    StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
                     
                     if(connection != null)
                     {
                         URISupport.SetProperties(this, connection, "connection.");
                     }
-                    
-                    if(nms != null)
-                    {
-                        URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
-                        URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
-                    }
 
                     brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
                 }
             }
         }
+
         public string UserName
         {
             get { return connectionUserName; }
@@ -259,29 +255,29 @@ namespace Apache.NMS.MQTT
             }
         }
 
-//        public IdGenerator ClientIdGenerator
-//        {
-//            set { this.clientIdGenerator = value; }
-//            get
-//            {
-//                lock(this)
-//                {
-//                    if(this.clientIdGenerator == null)
-//                    {
-//                        if(this.clientIdPrefix != null)
-//                        {
-//                            this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
-//                        }
-//                        else
-//                        {
-//                            this.clientIdGenerator = new IdGenerator();
-//                        }
-//                    }
-//
-//                    return this.clientIdGenerator;
-//                }
-//            }
-//        }
+        public IdGenerator ClientIdGenerator
+        {
+            set { this.clientIdGenerator = value; }
+            get
+            {
+                lock(this)
+                {
+                    if(this.clientIdGenerator == null)
+                    {
+                        if(this.clientIdPrefix != null)
+                        {
+                            this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+                        }
+                        else
+                        {
+                            this.clientIdGenerator = new IdGenerator();
+                        }
+                    }
+
+                    return this.clientIdGenerator;
+                }
+            }
+        }
 
         public event ExceptionListener OnException
         {
@@ -313,15 +309,8 @@ namespace Apache.NMS.MQTT
 
         protected virtual void ConfigureConnection(Connection connection)
         {
-            connection.AsyncSend = this.AsyncSend;
-            connection.CopyMessageOnSend = this.CopyMessageOnSend;
-            connection.AlwaysSyncSend = this.AlwaysSyncSend;
-            connection.SendAcksAsync = this.SendAcksAsync;
-            connection.DispatchAsync = this.DispatchAsync;
             connection.AcknowledgementMode = this.acknowledgementMode;
             connection.RequestTimeout = this.requestTimeout;
-            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
             connection.ConsumerTransformer = this.consumerTransformer;
             connection.ProducerTransformer = this.producerTransformer;
         }

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT
+{
+    /// <summary>
+    /// Exception thrown when a connection is used that it has failed in some way.
+    /// </summary>
+    [Serializable]
+    public class ConnectionFailedException : NMSException
+    {
+        public ConnectionFailedException()
+            : base("The connection has failed!")
+        {
+        }
+
+        public ConnectionFailedException(string message)
+            : base(message)
+        {
+        }
+
+        public ConnectionFailedException(string message, string errorCode)
+            : base(message, errorCode)
+        {
+        }
+
+        public ConnectionFailedException(string message, Exception innerException)
+            : base(message, innerException)
+        {
+        }
+
+        public ConnectionFailedException(string message, string errorCode, Exception innerException)
+            : base(message, errorCode, innerException)
+        {
+        }
+
+        #region ISerializable interface implementation
+
+        /// <summary>
+        /// Initializes a new instance of the ConnectionFailedException class with serialized data.
+        /// Throws System.ArgumentNullException if the info parameter is null.
+        /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+        /// </summary>
+        /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+        /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+        protected ConnectionFailedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+            : base(info, context)
+        {
+        }
+
+        #endregion
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs Fri Nov 15 23:43:13 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT
 {

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Fri Nov 15 23:43:13 2013
@@ -18,7 +18,7 @@ using System;
 using System.Threading;
 using System.Collections.Generic;
 using System.Collections.Specialized;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
 using Apache.NMS.MQTT.Util;
 using Apache.NMS.MQTT.Threads;
 using Apache.NMS.Util;
@@ -27,6 +27,7 @@ namespace Apache.NMS.MQTT
 {
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
+        private readonly Session session;
         private readonly MessageTransformation messageTransformation;
         private readonly MessageDispatchChannel unconsumedMessages;
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
@@ -61,8 +62,111 @@ namespace Apache.NMS.MQTT
 			set { this.consumerTransformer = value; }
 		}
 
+
+		public event MessageListener Listener
+		{
+			add
+			{
+				CheckClosed();
+
+				bool wasStarted = this.session.Started;
+
+				if(wasStarted)
+				{
+					this.session.Stop();
+				}
+
+				listener += value;
+				this.session.Redispatch(this.unconsumedMessages);
+
+				if(wasStarted)
+				{
+					this.session.Start();
+				}
+			}
+			remove { listener -= value; }
+		}
+
 		#endregion
 
+		public void Start()
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				return;
+			}
+
+			this.started.Value = true;
+			this.unconsumedMessages.Start();
+			this.session.Executor.Wakeup();
+		}
+
+		public void Stop()
+		{
+			this.started.Value = false;
+			this.unconsumedMessages.Stop();
+		}
+
+		public bool Iterate()
+		{
+			if(this.listener != null)
+			{
+				MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
+				if(dispatch != null)
+				{
+					this.Dispatch(dispatch);
+					return true;
+				}
+			}
+
+			return false;
+		}
+
+		private void CheckClosed()
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				throw new NMSException("The Consumer has been Closed");
+			}
+		}
+
+		private void CheckMessageListener()
+		{
+			if(this.listener != null)
+			{
+				throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+			}
+		}
+
+		protected bool IsAutoAcknowledgeEach
+		{
+			get
+			{
+				return this.session.IsAutoAcknowledge ||
+					   (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
+			}
+		}
+
+	    protected bool IsAutoAcknowledgeBatch
+		{
+			get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
+		}
+
+        protected bool IsIndividualAcknowledge
+		{
+			get { return this.session.IsIndividualAcknowledge; }
+		}
+
+        protected bool IsClientAcknowledge
+		{
+			get { return this.session.IsClientAcknowledge; }
+		}
+
+	    internal bool Closed
+	    {
+            get { return this.unconsumedMessages.Closed; }
+	    }
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Fri Nov 15 23:43:13 2013
@@ -19,7 +19,7 @@ using System;
 using System.IO;
 using Apache.NMS.Util;
 
-namespace Apache.NMS.MQTT.Commands
+namespace Apache.NMS.MQTT.Messages
 {
 	public class BytesMessage : MQTTMessage, IBytesMessage
 	{
@@ -34,7 +34,7 @@ namespace Apache.NMS.MQTT.Commands
 			return base.Clone();
 		}
 
-		public override void OnSend()
+		public virtual void OnSend()
 		{
 			base.OnSend();
 			StoreContent();
@@ -563,10 +563,10 @@ namespace Apache.NMS.MQTT.Commands
         /// </summary>
         private class LengthTrackerStream : Stream
         {
-            private readonly ActiveMQBytesMessage parent;
+            private readonly BytesMessage parent;
             private readonly Stream sink;
 
-            public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+            public LengthTrackerStream(Stream sink, BytesMessage parent) : base()
             {
                 this.sink = sink;
                 this.parent = parent;

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Fri Nov 15 23:43:13 2013
@@ -49,12 +49,7 @@ namespace Apache.NMS.MQTT.Messages
             return id != null ? id.GetHashCode() : base.GetHashCode();
         }
 
-	    public override byte GetDataStructureType()
-		{
-			return ID_ACTIVEMQMESSAGE;
-		}
-
-		public override object Clone()
+		public virtual object Clone()
 		{
 			MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
 
@@ -118,20 +113,6 @@ namespace Apache.NMS.MQTT.Messages
 			}
 		}
 
-        public override bool ReadOnlyProperties
-        {
-            get{ return base.ReadOnlyProperties; }
-            
-            set
-            {
-                if(this.propertyHelper != null)
-                {
-                    this.propertyHelper.ReadOnly = value;
-                }
-                base.ReadOnlyProperties = value;
-            }
-        }
-        
 		#region Properties
 
 		public IPrimitiveMap Properties
@@ -386,26 +367,6 @@ namespace Apache.NMS.MQTT.Messages
 		{
 			Properties[name] = value;
 		}
-
-		// MarshallAware interface
-		public override bool IsMarshallAware()
-		{
-			return true;
-		}
-
-		public override void BeforeMarshall(OpenWireFormat wireFormat)
-		{
-			MarshalledProperties = null;
-			if(properties != null)
-			{
-				MarshalledProperties = properties.Marshal();
-			}
-		}
-
-		public override Response Visit(ICommandVisitor visitor)
-		{
-			return visitor.ProcessMessage(this);
-		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs Fri Nov 15 23:43:13 2013
@@ -16,6 +16,7 @@
  */
 
 using System;
+using Apache.NMS.MQTT.Transport;
 
 namespace Apache.NMS.MQTT.Messages
 {
@@ -30,10 +31,8 @@ namespace Apache.NMS.MQTT.Messages
      */
     public class MessageDispatch : BaseCommand
     {
-        ConsumerId consumerId;
-        ActiveMQDestination destination;
-        Message message;
-        int redeliveryCounter;
+        Topic destination;
+        MQTTMessage message;
 
         ///
         /// <summery>
@@ -46,10 +45,8 @@ namespace Apache.NMS.MQTT.Messages
             return GetType().Name + "[ " + 
                 "commandId = " + this.CommandId + ", " + 
                 "responseRequired = " + this.ResponseRequired + ", " + 
-                "ConsumerId = " + ConsumerId + ", " + 
                 "Destination = " + Destination + ", " + 
-                "Message = " + Message + ", " + 
-                "RedeliveryCounter = " + RedeliveryCounter + " ]";
+                "Message = " + Message + " ]";
         }
 
         public Exception RollbackCause
@@ -58,38 +55,24 @@ namespace Apache.NMS.MQTT.Messages
             set { this.rollbackCause = value; }
         }
 
-        public ConsumerId ConsumerId
-        {
-            get { return consumerId; }
-            set { this.consumerId = value; }
-        }
-
-        public ActiveMQDestination Destination
+        public Topic Destination
         {
             get { return destination; }
             set { this.destination = value; }
         }
 
-        public Message Message
+        public MQTTMessage Message
         {
             get { return message; }
             set { this.message = value; }
         }
 
-        public int RedeliveryCounter
-        {
-            get { return redeliveryCounter; }
-            set { this.redeliveryCounter = value; }
-        }
-
         public override int GetHashCode()
         {
             int answer = 0;
 
-            answer = (answer * 37) + HashCode(ConsumerId);
             answer = (answer * 37) + HashCode(Destination);
             answer = (answer * 37) + HashCode(Message);
-            answer = (answer * 37) + HashCode(RedeliveryCounter);
 
             return answer;
         }
@@ -106,10 +89,6 @@ namespace Apache.NMS.MQTT.Messages
 
         public virtual bool Equals(MessageDispatch that)
         {
-            if(!Equals(this.ConsumerId, that.ConsumerId))
-            {
-                return false;
-            }
             if(!Equals(this.Destination, that.Destination))
             {
                 return false;
@@ -118,10 +97,6 @@ namespace Apache.NMS.MQTT.Messages
             {
                 return false;
             }
-            if(!Equals(this.RedeliveryCounter, that.RedeliveryCounter))
-            {
-                return false;
-            }
 
             return true;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Fri Nov 15 23:43:13 2013
@@ -16,6 +16,7 @@
 //
 using System;
 using System.Collections;
+using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT
 {
@@ -235,7 +236,7 @@ namespace Apache.NMS.MQTT
 
         public ITopic GetTopic(string name)
         {
-			return null;  // TODO
+			return new Topic(name);
         }
 
         public ITemporaryQueue CreateTemporaryQueue()
@@ -384,14 +385,14 @@ namespace Apache.NMS.MQTT
             }
         }
 
-        public void RemoveProducer(ProducerId objectId)
-        {
-            connection.RemoveProducer(objectId);
-            if(!this.closing)
-            {
-                producers.Remove(objectId);
-            }
-        }
+//        public void RemoveProducer(ProducerId objectId)
+//        {
+//            connection.RemoveProducer(objectId);
+//            if(!this.closing)
+//            {
+//                producers.Remove(objectId);
+//            }
+//        }
 
         private MQTTMessage ConfigureMessage(MQTTMessage message)
         {
@@ -416,7 +417,7 @@ namespace Apache.NMS.MQTT
         /// <param name="message">
         /// A <see cref="ActiveMQMessage"/>
         /// </param>
-        private static void DoNothingAcknowledge(ActiveMQMessage message)
+        private static void DoNothingAcknowledge(MQTTMessage message)
         {
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Fri Nov 15 23:43:13 2013
@@ -17,7 +17,7 @@
 
 using System;
 using System.Collections;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
 using Apache.NMS.MQTT.Util;
 using Apache.NMS.MQTT.Threads;
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs Fri Nov 15 23:43:13 2013
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-namespace Apache.NMS.MQTT
+namespace Apache.NMS.MQTT.Threads
 {
 	/// <summary>
 	/// Represents a task that may take a few iterations to complete.

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	/// <summary>
+	/// A facility for applications to schedule tasks for future execution in a background 
+	/// thread. Tasks may be scheduled for one-time execution, or for repeated execution at 
+	/// regular intervals.  Unlike the normal .NET Timer this Timer allows for multiple tasks
+	/// to be scheduled in a single Timer object.  
+	/// 
+	/// Corresponding to each Timer object is a single background thread that is used to execute
+	/// all of the timer's tasks, sequentially. Timer tasks should complete quickly. If a timer 
+	/// task takes excessive time to complete, it "hogs" the timer's task execution thread. This
+	/// can, in turn, delay the execution of subsequent tasks, which may "bunch up" and execute 
+	/// in rapid succession when (and if) the offending task finally completes.
+	/// 
+	/// After the last live reference to a Timer object goes away and all outstanding tasks have 
+	/// completed execution, the timer's task execution thread terminates gracefully (and becomes
+	/// subject to garbage collection). However, this can take arbitrarily long to occur. By default, 
+	/// the task execution thread does not run as a Background thread, so it is capable of keeping an 
+	/// application from terminating. If a caller wants to terminate a timer's task execution thread
+	/// rapidly, the caller should invoke the timer's cancel method.
+	/// 
+	/// If the timer's task execution thread terminates unexpectedly, any further attempt to schedule
+	/// a task on the timer will result in an IllegalStateException, as if the timer's cancel method
+	/// had been invoked.
+	/// 
+	/// This class is thread-safe: multiple threads can share a single Timer object without the 
+	/// need for external synchronization.
+	/// 
+	/// This class does not offer real-time guarantees: it schedules tasks using the 
+	/// EventWaitHandle.WaitOne(TimeSpan) method.
+	/// </summary>
+	public class TimerEx
+	{
+		#region Static Id For Anonymous Timer Naming.
+
+		private static long timerId;
+
+		private static long NextId() 
+		{
+			return Interlocked.Increment(ref timerId);
+		}
+
+		#endregion
+
+		private readonly TimerImpl impl;
+
+	    // Used to finalize thread
+	    private readonly DisposeHelper disposal;
+
+	    public TimerEx(String name, bool isBackground)
+		{
+	    	if (name == null)
+			{
+	    		throw new NullReferenceException("name is null");
+	    	}
+	        this.impl = new TimerImpl(name, isBackground);
+	        this.disposal = new DisposeHelper(impl);
+		}
+
+	    public TimerEx(String name) : this(name, false)
+		{
+	    }
+
+	    public TimerEx(bool isBackground) : this("Timer-" + TimerEx.NextId().ToString(), isBackground)
+		{
+	    }
+
+	    public TimerEx() : this(false)
+		{
+	    }
+
+		/// <summary>
+		/// Terminates this timer, discarding any currently scheduled tasks. Does not interfere
+		/// with a currently executing task (if it exists). Once a timer has been terminated, 
+		/// its execution thread terminates gracefully, and no more tasks may be scheduled on it.
+		/// 
+		/// Note that calling this method from within the run method of a timer task that was 
+		/// invoked by this timer absolutely guarantees that the ongoing task execution is the 
+		/// last task execution that will ever be performed by this timer.
+		/// 
+		/// This method may be called repeatedly; the second and subsequent calls have no effect. 
+		/// </summary>
+	    public void Cancel() 
+		{
+	        this.impl.Cancel();
+	    }
+
+		/// <summary>
+		/// Removes all cancelled tasks from this timer's task queue. Calling this method has 
+		/// no effect on the behavior of the timer, but eliminates the references to the cancelled
+		/// tasks from the queue. If there are no external references to these tasks, they become 
+		/// eligible for garbage collection.
+		/// 
+		/// Most programs will have no need to call this method. It is designed for use by the 
+		/// rare application that cancels a large number of tasks. Calling this method trades 
+		/// time for space: the runtime of the method may be proportional to n + c log n, where 
+		/// n is the number of tasks in the queue and c is the number of cancelled tasks.
+		/// 
+		/// Note that it is permissible to call this method from within a a task scheduled 
+		/// on this timer. 
+		/// </summary>
+	    public int Purge() 
+		{
+	        lock(this.impl.SyncRoot)
+			{
+	            return impl.Purge();
+	        }
+	    }
+
+		public override string ToString()
+		{
+			return string.Format("[TimerEx{0}]", this.impl.Name);
+		}
+
+		#region WaitCallback Scheduling Methods
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for execution at the specified time. If the 
+		/// time is in the past, the task is scheduled for immediate execution.  The method returns
+		/// a TimerTask instance that can be used to later cancel the scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, DateTime when) 
+		{
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        TimeSpan delay = when - DateTime.Now;
+	        DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for execution after the specified delay. 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, int delay)
+		{
+ 			if(delay < 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+			DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(-1), false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for execution after the specified delay. 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, TimeSpan delay)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-delay execution, 
+		/// beginning after the specified delay. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, int delay, int period)
+		{
+ 			if(delay < 0 || period <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-delay execution, 
+		/// beginning after the specified delay. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, TimeSpan delay, TimeSpan period)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        DoScheduleImpl(task, delay, period, false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-delay execution, 
+		/// beginning at the specified start time. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, DateTime when, int period) 
+		{
+	        if (period <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+	        
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+			TimeSpan delay = when - DateTime.Now;	        
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-delay execution, 
+		/// beginning at the specified start time. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask Schedule(WaitCallback callback, object arg, DateTime when, TimeSpan period) 
+		{
+	        if (period.CompareTo(TimeSpan.Zero) <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+	        
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+			TimeSpan delay = when - DateTime.Now;	        
+			DoScheduleImpl(task, delay, period, false);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, int delay, int period)
+		{
+ 			if(delay < 0 || period <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), true);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, TimeSpan delay, TimeSpan period)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        DoScheduleImpl(task, delay, period, true);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, DateTime when, int period)
+		{
+	        if (period <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        TimeSpan delay = when - DateTime.Now;
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), true);
+			return task;
+	    }
+
+		/// <summary>
+		/// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// 
+		/// The method returns a TimerTask instance that can be used to later cancel the 
+		/// scheduled task.
+		/// </summary>
+	    public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, DateTime when, TimeSpan period)
+		{
+	        if (period.CompareTo(TimeSpan.Zero) <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			InternalTimerTask task = new InternalTimerTask(callback, arg);
+	        TimeSpan delay = when - DateTime.Now;
+			DoScheduleImpl(task, delay, period, true);
+			return task;
+	    }
+
+		#endregion
+
+		#region TimerTask Scheduling Methods
+
+		/// <summary>
+		/// Schedules the specified TimerTask for execution at the specified time. If the 
+		/// time is in the past.
+		/// </summary>
+	    public void Schedule(TimerTask task, DateTime when) 
+		{
+	        TimeSpan delay = when - DateTime.Now;
+	        DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for execution after the specified delay. 
+		/// </summary>
+	    public void Schedule(TimerTask task, int delay)
+		{
+ 			if(delay < 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(-1), false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for execution after the specified delay. 
+		/// </summary>
+	    public void Schedule(TimerTask task, TimeSpan delay)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-delay execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// </summary>
+	    public void Schedule(TimerTask task, int delay, int period)
+		{
+ 			if(delay < 0 || period <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-delay execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// </summary>
+	    public void Schedule(TimerTask task, TimeSpan delay, TimeSpan period)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        DoScheduleImpl(task, delay, period, false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-delay execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// </summary>
+	    public void Schedule(TimerTask task, DateTime when, int period) 
+		{
+	        if (period <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+	        
+			TimeSpan delay = when - DateTime.Now;	        
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-delay execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately 
+		/// regular intervals separated by the specified period.
+		/// 
+		/// In fixed-delay execution, each execution is scheduled relative to the actual execution
+		/// time of the previous execution. If an execution is delayed for any reason (such as 
+		/// garbage collection or other background activity), subsequent executions will be delayed.
+		/// 
+		/// Fixed-delay execution is appropriate for recurring activities that require "smoothness." 
+		/// In other words, it is appropriate for activities where it is more important to keep the
+		/// frequency accurate in the short run than in the long run.
+		/// </summary>
+	    public void Schedule(TimerTask task, DateTime when, TimeSpan period) 
+		{
+	        if (period.CompareTo(TimeSpan.Zero) <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+	        
+			TimeSpan delay = when - DateTime.Now;	        
+			DoScheduleImpl(task, delay, period, false);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-rate execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// </summary>
+	    public void ScheduleAtFixedRate(TimerTask task, int delay, int period)
+		{
+ 			if(delay < 0 || period <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), true);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-rate execution, beginning 
+		/// after the specified delay. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// </summary>
+	    public void ScheduleAtFixedRate(TimerTask task, TimeSpan delay, TimeSpan period)
+		{
+ 			if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        DoScheduleImpl(task, delay, period, true);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-rate execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// </summary>
+	    public void ScheduleAtFixedRate(TimerTask task, DateTime when, int period)
+		{
+	        if (period <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        TimeSpan delay = when - DateTime.Now;
+			DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), true);
+	    }
+
+		/// <summary>
+		/// Schedules the specified TimerTask for repeated fixed-rate execution, beginning 
+		/// at the specified time. Subsequent executions take place at approximately regular 
+		/// intervals, separated by the specified period.
+		/// 
+		/// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+		/// time of the initial execution. If an execution is delayed for any reason (such as garbage
+		/// collection or other background activity), two or more executions will occur in rapid 
+		/// succession to "catch up." 
+		/// 
+		/// Fixed-rate execution is appropriate for recurring activities that are sensitive to 
+		/// absolute time, such as ringing a chime every hour on the hour, or running scheduled 
+		/// maintenance every day at a particular time.
+		/// </summary>
+	    public void ScheduleAtFixedRate(TimerTask task, DateTime when, TimeSpan period)
+		{
+	        if (period.CompareTo(TimeSpan.Zero) <= 0) 
+			{
+	            throw new ArgumentOutOfRangeException();
+	        }
+
+	        TimeSpan delay = when - DateTime.Now;
+			DoScheduleImpl(task, delay, period, true);
+	    }
+
+		#endregion
+
+		#region Implementation of Scheduling method.
+
+	    private void DoScheduleImpl(TimerTask task, TimeSpan delay, TimeSpan period, bool fixedRate) 
+		{
+			if (task == null)
+			{
+				throw new ArgumentNullException("TimerTask cannot be null");
+			}
+
+			lock(this.impl.SyncRoot)
+			{
+				if (impl.Cancelled) 
+				{
+	                throw new InvalidOperationException();
+	            }
+
+	            DateTime when = DateTime.Now + delay;
+
+	            lock(task.syncRoot)
+				{
+	                if (task.IsScheduled)
+					{
+	                    throw new InvalidOperationException();
+	                }
+
+	                if (task.cancelled)
+					{
+	                    throw new InvalidOperationException("Task is already cancelled");
+	                }
+
+	                task.when = when;
+	                task.period = period;
+	                task.fixedRate = fixedRate;
+	            }
+
+	            // insert the newTask into queue
+	            impl.InsertTask(task);
+	        }
+	    }
+
+		#endregion
+
+		#region Interal TimerTask to invoking WaitCallback tasks
+
+		private class InternalTimerTask : TimerTask
+		{
+			private WaitCallback task;
+			private object taskArg;
+
+			public InternalTimerTask(WaitCallback task, object taskArg)
+			{
+				if (task == null)
+				{
+					throw new ArgumentNullException("The WaitCallback task cannot be null");
+				}
+
+				this.task = task;
+				this.taskArg = taskArg;
+			}
+
+			public override void Run()
+			{
+				this.task(taskArg);
+			}
+		}
+
+		#endregion
+
+		#region Timer Heap that sorts Tasks into timed order.
+
+        private sealed class TimerHeap  
+		{	        
+			internal static readonly int DEFAULT_HEAP_SIZE = 256;
+            
+			internal TimerTask[] timers = new TimerTask[DEFAULT_HEAP_SIZE];
+            internal int size = 0;
+            internal int deletedCancelledNumber = 0;
+
+            public TimerTask Minimum() 
+			{
+                return timers[0];
+            }
+
+            public bool IsEmpty() 
+			{
+                return size == 0;
+            }
+
+            public void Insert(TimerTask task) 
+			{
+                if (timers.Length == size) 
+				{
+                    TimerTask[] appendedTimers = new TimerTask[size * 2];
+					timers.CopyTo(appendedTimers, 0);
+                    timers = appendedTimers;
+                }
+                timers[size++] = task;
+                UpHeap();
+            }
+
+            public void Delete(int pos) 
+			{
+                // posible to delete any position of the heap
+                if (pos >= 0 && pos < size) 
+				{
+                    timers[pos] = timers[--size];
+                    timers[size] = null;
+                    DownHeap(pos);
+                }
+            }
+
+            private void UpHeap() 
+			{
+                int current = size - 1;
+                int parent = (current - 1) / 2;
+
+                while (timers[current].when < timers[parent].when) 
+				{
+                    // swap the two
+                    TimerTask tmp = timers[current];
+                    timers[current] = timers[parent];
+                    timers[parent] = tmp;
+
+                    // update pos and current
+                    current = parent;
+                    parent = (current - 1) / 2;
+                }
+            }
+
+            private void DownHeap(int pos) 
+			{
+                int current = pos;
+                int child = 2 * current + 1;
+
+                while (child < size && size > 0) 
+				{
+                    // compare the children if they exist
+                    if (child + 1 < size && timers[child + 1].when < timers[child].when) 
+					{
+                        child++;
+                    }
+
+                    // compare selected child with parent
+                    if (timers[current].when < timers[child].when) 
+					{
+                        break;
+                    }
+
+                    // swap the two
+                    TimerTask tmp = timers[current];
+                    timers[current] = timers[child];
+                    timers[child] = tmp;
+
+                    // update pos and current
+                    current = child;
+                    child = 2 * current + 1;
+                }
+            }
+
+            public void Reset() 
+			{
+                timers = new TimerTask[DEFAULT_HEAP_SIZE];
+                size = 0;
+            }
+
+            public void AdjustMinimum() 
+			{
+                DownHeap(0);
+            }
+
+            public void DeleteIfCancelled() 
+			{
+                for (int i = 0; i < size; i++) 
+				{
+                    if (timers[i].cancelled) 
+					{
+                        deletedCancelledNumber++;
+                        Delete(i);
+                        // re-try this point
+                        i--;
+                    }
+                }
+            }
+
+            internal int GetTask(TimerTask task) 
+			{
+                for (int i = 0; i < timers.Length; i++) 
+				{
+                    if (timers[i] == task) 
+					{
+                        return i;
+                    }
+                }
+                return -1;
+            }
+        }
+
+		#endregion
+
+		#region TimerEx Task Runner Implementation
+
+		private sealed class TimerImpl
+		{
+	        private bool cancelled;
+	        private bool finished;
+			private String name;
+	        private TimerHeap tasks = new TimerHeap();
+			private System.Threading.Thread runner;
+			private object syncRoot = new object();
+
+	        public TimerImpl(String name, bool isBackground) 
+			{
+				this.name = name;
+				this.runner = new Thread(new ThreadStart(this.Run));
+				this.runner.Name = name;
+				this.runner.IsBackground = isBackground;
+	            this.runner.Start();
+	        }
+
+			public String Name
+			{
+				get { return this.name; }
+			}
+
+			public object SyncRoot
+			{
+				get { return this.syncRoot; }
+			}
+
+			public bool Cancelled
+			{
+				get { return this.cancelled; }
+			}
+
+			public bool Finished 
+			{
+				set { this.finished = value; }
+			}
+
+			/// <summary>
+			/// Run this Timers event loop in its own Thread.
+			/// </summary>
+	        public void Run() 
+			{
+	            while (true) 
+				{
+	                TimerTask task;
+	                lock (this.syncRoot) 
+					{
+	                    // need to check cancelled inside the synchronized block
+	                    if (cancelled) 
+						{
+	                        return;
+	                    }
+
+	                    if (tasks.IsEmpty()) 
+						{
+	                        if (finished) 
+							{
+	                            return;
+	                        }
+	                        
+							// no tasks scheduled -- sleep until any task appear
+	                        try
+							{
+								Monitor.Wait(this.syncRoot);
+	                        } 
+							catch (ThreadInterruptedException) 
+							{
+	                        }
+	                        continue;
+	                    }
+
+	                    DateTime currentTime = DateTime.Now;
+
+	                    task = tasks.Minimum();
+	                    TimeSpan timeToSleep;
+
+	                    lock (task.syncRoot)
+						{
+	                        if (task.cancelled) 
+							{
+	                            tasks.Delete(0);
+	                            continue;
+	                        }
+
+	                        // check the time to sleep for the first task scheduled
+	                        timeToSleep = task.when - currentTime;
+	                    }
+
+	                    if (timeToSleep.CompareTo(TimeSpan.Zero) > 0) 
+						{
+	                        // sleep!
+	                        try 
+							{
+								Monitor.Wait(this.syncRoot, timeToSleep);
+	                        } 
+							catch (ThreadInterruptedException) 
+							{
+	                        }
+	                        continue;
+	                    }
+
+	                    // no sleep is necessary before launching the task
+	                    lock (task.syncRoot) 
+						{
+	                        int pos = 0;
+	                        if (tasks.Minimum().when != task.when) 
+							{
+	                            pos = tasks.GetTask(task);
+	                        }
+	                        if (task.cancelled)
+							{
+	                            tasks.Delete(tasks.GetTask(task));
+	                            continue;
+	                        }
+
+	                        // set time to schedule
+	                        task.ScheduledTime = task.when;
+
+	                        // remove task from queue
+	                        tasks.Delete(pos);
+
+	                        // set when the next task should be launched
+	                        if (task.period.CompareTo(TimeSpan.Zero) >= 0) 
+							{
+	                            // this is a repeating task,
+	                            if (task.fixedRate) 
+								{
+	                                // task is scheduled at fixed rate
+	                                task.when = task.when + task.period;
+	                            } 
+								else 
+								{
+	                                // task is scheduled at fixed delay
+	                                task.when = DateTime.Now + task.period;
+	                            }
+
+	                            // insert this task into queue
+	                            InsertTask(task);
+	                        }
+							else 
+							{
+	                            task.when = DateTime.MinValue;
+	                        }
+	                    }
+	                }
+
+	                bool taskCompletedNormally = false;
+	                try 
+					{
+	                    task.Run();
+	                    taskCompletedNormally = true;
+	                }
+					finally 
+					{
+	                    if (!taskCompletedNormally) 
+						{
+	                        lock (this) 
+							{
+	                            cancelled = true;
+	                        }
+	                    }
+	                }
+	            }
+	        }
+
+	        public void InsertTask(TimerTask newTask) 
+			{
+	            // callers are synchronized
+	            tasks.Insert(newTask);
+				Monitor.Pulse(this.syncRoot);
+	        }
+
+	        public void Cancel() 
+			{
+				lock(this.syncRoot)
+				{
+	            	cancelled = true;
+	            	tasks.Reset();
+					Monitor.Pulse(this.syncRoot);
+				}
+	        }
+
+	        public int Purge() 
+			{
+	            if (tasks.IsEmpty()) 
+				{
+	                return 0;
+	            }
+	            
+				// callers are synchronized
+	            tasks.deletedCancelledNumber = 0;
+	            tasks.DeleteIfCancelled();
+	            return tasks.deletedCancelledNumber;
+	        }
+	    }
+
+		#endregion
+
+		#region Helper class to handle Timer shutdown when Disposed
+
+		private sealed class DisposeHelper : IDisposable
+		{
+			private readonly TimerImpl impl;
+			
+			public DisposeHelper(TimerImpl impl) 
+			{
+				this.impl = impl;
+			}
+			
+			public void Dispose() 
+			{
+				lock(impl.SyncRoot)
+				{
+					impl.Finished = true;
+					Monitor.PulseAll(impl.SyncRoot);
+				}
+			}
+		}
+
+		#endregion
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	/// <summary>
+	/// A Task that is run in a Timer instance either once or repeatedly.
+	/// </summary>
+	public abstract class TimerTask
+	{
+		internal object syncRoot = new object();
+
+		internal DateTime when = DateTime.MinValue;
+		internal DateTime scheduledTime = DateTime.MinValue;
+		internal TimeSpan period;
+		internal bool cancelled;
+		internal bool fixedRate;
+
+		protected TimerTask()
+		{
+		}
+
+		public bool Cancel()
+		{
+			lock(this.syncRoot)
+			{
+	            bool willRun = !cancelled && when != DateTime.MinValue;
+	            cancelled = true;
+	            return willRun;
+	        }
+		}
+
+		public DateTime ScheduledExecutionTime
+		{
+			get
+			{
+				lock(this.syncRoot)
+				{
+					return this.scheduledTime;
+				}
+			}
+		}
+
+		public abstract void Run();
+
+		#region Timer Methods
+
+		internal DateTime When
+		{
+			get
+			{
+				lock(this.syncRoot)
+				{
+					return this.when;
+				}
+			}
+		}
+
+	    internal DateTime ScheduledTime
+		{
+			set 
+			{
+				lock(this.syncRoot)
+				{
+					this.scheduledTime = value;
+		        }
+			}
+	    }
+
+	    internal bool IsScheduled
+		{
+			get
+			{
+				lock(this.syncRoot)
+				{
+	            	return when != DateTime.MinValue || scheduledTime != DateTime.MinValue;
+	        	}
+			}
+	    }
+
+		#endregion
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs
------------------------------------------------------------------------------
    svn:eol-style = native