You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/19 19:32:02 UTC

svn commit: r1543529 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Util/ test/csharp/

Author: tabish
Date: Tue Nov 19 18:32:02 2013
New Revision: 1543529

URL: http://svn.apache.org/r1543529
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs Tue Nov 19 18:32:02 2013
@@ -30,12 +30,12 @@ namespace Apache.NMS.MQTT.Commands
 			set { this.returnCode = value; }
 		}
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "CONNACK"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
 		public const byte TYPE = 1;
 		public const String PROTOCOL_NAME = "MQIsdp";
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "CONNECT"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 14;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "DISCONNECT"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 11;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PINGREQ"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 13;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PINGRESP"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 4;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PUBACK"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 7;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PUBCOMP"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 3;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PUBLISH"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 5;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PUBREC"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 6;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "PUBREL"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs Tue Nov 19 18:32:02 2013
@@ -29,12 +29,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 8;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "SUBACK"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs Tue Nov 19 18:32:02 2013
@@ -27,12 +27,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 7;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "SUBSCRIBE"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 10;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "UNSUBACK"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
 	{
 		public const byte TYPE = 9;
 
-		public int CommandType
+		public override int CommandType
 		{
 			get { return TYPE; }
 		}
 
-		public string CommandName
+		public override string CommandName
 		{
 			get { return "UNSUBSCRIBE"; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Tue Nov 19 18:32:02 2013
@@ -28,7 +28,6 @@ namespace Apache.NMS.MQTT
 {
 	public class Connection : IConnection
 	{
-		private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
 		private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite);
 
 		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
@@ -38,14 +37,12 @@ namespace Apache.NMS.MQTT
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
 		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-        private readonly object myLock = new object();
         private readonly Atomic<bool> connected = new Atomic<bool>(false);
         private readonly Atomic<bool> closed = new Atomic<bool>(false);
         private readonly Atomic<bool> closing = new Atomic<bool>(false);
         private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
 		private readonly object connectedLock = new object();
         private Exception firstFailureError = null;
-		private bool userSpecifiedClientID;
         private int sessionCounter = 0;
         private readonly Atomic<bool> started = new Atomic<bool>(false);
         private ConnectionMetaData metaData = null;
@@ -55,6 +52,8 @@ namespace Apache.NMS.MQTT
         private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 		private readonly IdGenerator clientIdGenerator;
 		private IRedeliveryPolicy redeliveryPolicy;
+		private Scheduler scheduler = null;
+		private bool userSpecifiedClientID;
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -63,6 +62,7 @@ namespace Apache.NMS.MQTT
 
 			SetTransport(transport);
 
+			this.messageTransformation = new MQTTMessageTransformation(this);
 			this.info = new CONNECT();
 		}
 
@@ -121,21 +121,21 @@ namespace Apache.NMS.MQTT
             get { return info.ClientId; }
             set
             {
-                if(this.connected.Value)
-                {
-                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
-                }
-
-                this.info.ClientId = value;
-                this.userSpecifiedClientID = true;
-                CheckConnected();
+				if(this.connected.Value)
+				{
+					throw new NMSException("You cannot change the ClientId once the Connection is connected");
+				}
+
+				this.info.ClientId = value;
+				this.userSpecifiedClientID = true;
+				CheckConnected();
             }
         }
 
 		/// <summary>
 		/// The Default Client Id used if the ClientId property is not set explicity.
 		/// </summary>
-		public string DefaultClientId
+		internal string DefaultClientId
 		{
 			set
 			{
@@ -282,7 +282,8 @@ namespace Apache.NMS.MQTT
 		protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
 		{
 			CheckConnected();
-			return new Session(this, ackMode);
+			int sessionId = Interlocked.Increment(ref sessionCounter);
+			return new Session(this, ackMode, sessionId);
 		}
 
 		internal void AddSession(Session session)
@@ -340,7 +341,81 @@ namespace Apache.NMS.MQTT
 
 		public void Close()
 		{
-			// TODO
+			if(!this.closed.Value && !transportFailed.Value)
+			{
+				this.Stop();
+			}
+
+			lock(connectedLock)
+			{
+				if(this.closed.Value)
+				{
+					return;
+				}
+
+				try
+				{
+					Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ClientId);
+					this.closing.Value = true;
+
+                    Scheduler scheduler = this.scheduler;
+                    if (scheduler != null) 
+					{
+                        try 
+						{
+                            scheduler.Stop();
+                        } 
+						catch (Exception e) 
+						{
+                            throw NMSExceptionSupport.Create(e);
+                        }
+                    }
+
+					lock(sessions.SyncRoot)
+					{
+						foreach(Session session in sessions)
+						{
+							session.Shutdown();
+						}
+					}
+					sessions.Clear();
+
+					// Connected is true only when we've successfully sent our CONNECT
+					// to the broker, so if we haven't announced ourselves there's no need to
+					// inform the broker of a remove, and if the transport is failed, why bother.
+					if(connected.Value && !transportFailed.Value)
+					{
+						DISCONNECT disconnect = new DISCONNECT();
+						transport.Oneway(disconnect);
+					}
+
+					executor.Shutdown();
+					if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+					{
+						Tracer.DebugFormat("Connection[{0}]: Failed to properly shutdown its executor", this.ClientId);
+					}
+
+					Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ClientId);
+					transport.Stop();
+					transport.Dispose();
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ClientId, ex);
+				}
+				finally
+				{
+					if(executor != null)
+					{
+						executor.Shutdown();
+					}
+
+					this.transport = null;
+					this.closed.Value = true;
+					this.connected.Value = false;
+					this.closing.Value = false;
+				}
+			}
 		}
 
 		public void Dispose()
@@ -520,6 +595,36 @@ namespace Apache.NMS.MQTT
 				}
 			}
 		}
+
+	    internal Scheduler Scheduler
+		{
+			get
+			{
+		        Scheduler result = this.scheduler;
+		        if (result == null) 
+				{
+		            lock (this) 
+					{
+		                result = scheduler;
+		                if (result == null) 
+						{
+		                    CheckClosed();
+		                    try 
+							{
+		                        result = scheduler = new Scheduler(
+									"MQTTConnection["+this.info.ClientId+"] Scheduler");
+		                        scheduler.Start();
+		                    }
+							catch(Exception e)
+							{
+		                        throw NMSExceptionSupport.Create(e);
+		                    }
+		                }
+		            }
+		        }
+		        return result;
+			}
+	    }
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs Tue Nov 19 18:32:02 2013
@@ -107,6 +107,8 @@ namespace Apache.NMS.MQTT
 
                 if(this.clientId != null)
                 {
+					// Set the connection factory version as the default, the user can
+					// still override this via a call to Connection.ClientId = XXX
                     connection.DefaultClientId = this.clientId;
                 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Tue Nov 19 18:32:02 2013
@@ -38,11 +38,39 @@ namespace Apache.NMS.MQTT
 		private ThreadPoolExecutor executor;
 		private int consumerId;
 		protected bool disposed = false;
+		private Topic destination = null;
 
 		private event MessageListener listener;
 
-		public MessageConsumer()
+		public MessageConsumer(Session session, Topic destination, int consumerId)
 		{
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+            }
+            else if(destination.TopicName == null)
+            {
+                throw new InvalidDestinationException("The destination object was not given a physical name.");
+            }
+
+			this.session = session;
+			this.consumerId = consumerId;
+			this.destination = destination;
+			this.messageTransformation = this.session.Connection.MessageTransformation;
+			this.unconsumedMessages = new FifoMessageDispatchChannel();
+
+			// If the destination contained a URI query, then use it to set public properties
+			// on the ConsumerInfo
+			if(destination.Options != null)
+			{
+				// Get options prefixed with "consumer.*"
+				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+				// Extract out custom extension options "consumer.nms.*"
+				StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+				URISupport.SetProperties(this, options);
+				URISupport.SetProperties(this, customConsumerOptions, "nms.");
+			}
 		}
 
 		#region Property Accessors
@@ -94,6 +122,11 @@ namespace Apache.NMS.MQTT
 			get { return this.consumerId; }
 		}
 
+		public Topic Destination
+		{
+			get { return this.destination; }
+		}
+
 		#endregion
 
 		public void Start()

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Tue Nov 19 18:32:02 2013
@@ -36,11 +36,20 @@ namespace Apache.NMS.MQTT
 
 		private readonly MessageTransformation messageTransformation;
 
-		public MessageProducer(Session session, TimeSpan requestTimeout)
+		public MessageProducer(Session session, Topic destination, TimeSpan requestTimeout, int producerId)
 		{
 			this.session = session;
 			this.RequestTimeout = requestTimeout;
+			this.producerId = producerId;
+			this.destination = destination;
 			this.messageTransformation = session.Connection.MessageTransformation;
+
+			// If the destination contained a URI query, then use it to set public
+			// properties on the ProducerInfo
+			if (destination != null && destination.Options != null)
+			{
+				URISupport.SetProperties(this, destination.Options, "producer.");
+			}
 		}
 
 		~MessageProducer()

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Tue Nov 19 18:32:02 2013
@@ -16,6 +16,7 @@
 //
 using System;
 using System.Collections;
+using System.Threading;
 using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT
@@ -43,9 +44,10 @@ namespace Apache.NMS.MQTT
 
         private readonly AcknowledgementMode acknowledgementMode;
         private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+        private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
         private TimeSpan requestTimeout;
 
-		public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+		public Session(Connection connection, AcknowledgementMode acknowledgementMode, int sessionId)
 		{
             this.connection = connection;
             this.acknowledgementMode = acknowledgementMode;
@@ -54,6 +56,7 @@ namespace Apache.NMS.MQTT
             this.ProducerTransformer = connection.ProducerTransformer;
 
             this.executor = new SessionExecutor(this, this.consumers);
+			this.sessionId = sessionId;
 
             if(connection.IsStarted)
             {
@@ -146,6 +149,11 @@ namespace Apache.NMS.MQTT
             set { this.requestTimeout = value; }
         }
 
+		public int SessionId
+		{
+			get { return this.sessionId; }
+		}
+
         private ConsumerTransformerDelegate consumerTransformer;
         /// <summary>
         /// A Delegate that is called each time a Message is dispatched to allow the client to do
@@ -221,8 +229,65 @@ namespace Apache.NMS.MQTT
 
 		internal void DoClose()
 		{
+			Shutdown();
 		}
 
+        internal void Shutdown()
+        {
+            Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.SessionId);
+
+            if(this.closed)
+            {
+                return;
+            }
+
+            lock(myLock)
+            {
+                if(this.closed || this.closing)
+                {
+                    return;
+                }
+
+                try
+                {
+                    this.closing = true;
+
+                    // Stop all message deliveries from this Session
+                    this.executor.Stop(this.closeStopTimeout);
+
+                    lock(consumers.SyncRoot)
+                    {
+                        foreach(MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.FailureError = this.connection.FirstFailureError;
+                            consumer.Shutdown();
+                        }
+                    }
+                    consumers.Clear();
+
+                    lock(producers.SyncRoot)
+                    {
+                        foreach(MessageProducer producer in producers.Values)
+                        {
+                            producer.Shutdown();
+                        }
+                    }
+                    producers.Clear();
+
+                    Connection.RemoveSession(this);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
         public IMessageProducer CreateProducer()
         {
             return CreateProducer(null);
@@ -476,12 +541,22 @@ namespace Apache.NMS.MQTT
         /// if the message is in a transaction.
         /// </summary>
         /// <param name="message">
-        /// A <see cref="ActiveMQMessage"/>
+        /// A <see cref="MQTTMessage"/>
         /// </param>
         private static void DoNothingAcknowledge(MQTTMessage message)
         {
         }
 
+		private int NextConsumerId
+		{
+			get { return Interlocked.Increment(ref this.consumerCounter); }
+		}
+
+		private int NextProducerId
+		{
+			get { return Interlocked.Increment(ref this.producerCounter); }
+		}
+
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue Nov 19 18:32:02 2013
@@ -15,6 +15,7 @@
 // limitations under the License.
 //
 using System;
+using System.Collections.Specialized;
 
 namespace Apache.NMS.MQTT
 {
@@ -23,7 +24,8 @@ namespace Apache.NMS.MQTT
 	/// </summary>
 	public class Topic : ITopic
 	{
-		string name;
+		private string name;
+		private StringDictionary options = null;
 
 		public Topic(string name)
 		{
@@ -54,6 +56,16 @@ namespace Apache.NMS.MQTT
 		{ 
 			get { return false; }
 		}
+
+		/// <summary>
+		/// Dictionary of name/value pairs representing option values specified
+		/// in the URI used to create this Destination.  A null value is returned
+		/// if no options were specified.
+		/// </summary>
+		internal StringDictionary Options
+		{
+			get { return this.options; }
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs Tue Nov 19 18:32:02 2013
@@ -17,7 +17,7 @@
 
 using System;
 using Apache.NMS.Util;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
 
 namespace Apache.NMS.MQTT.Util
 {
@@ -39,7 +39,9 @@ namespace Apache.NMS.MQTT.Util
 
         protected override IBytesMessage DoCreateBytesMessage()
         {
-			return null;
+			BytesMessage message = new BytesMessage();
+			message.Connection = connection;
+			return message;
         }
 
         protected override ITextMessage DoCreateTextMessage()

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1543529&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs Tue Nov 19 18:32:02 2013
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// ------------------------------------------------------------------------------
+//  <autogenerated>
+//      This code was generated by a tool.
+//      Mono Runtime Version: 4.0.30319.1
+// 
+//      Changes to this file may cause incorrect behavior and will be lost if 
+//      the code is regenerated.
+//  </autogenerated>
+// ------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyDescriptionAttribute("Apache NMS for MQTT Class Library (.Net Messaging Library Implementation): An implementation of the NMS API for MQTT")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
+[assembly: AssemblyProductAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
+

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native