You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/19 17:38:20 UTC

svn commit: r1543485 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/ src/main/csharp/Messages/ src/main/csharp/Transport/

Author: tabish
Date: Tue Nov 19 16:38:19 2013
New Revision: 1543485

URL: http://svn.apache.org/r1543485
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Tue Nov 19 16:38:19 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
 [assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
 [assembly: AssemblyTrademarkAttribute("")]
 [assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3237")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
 [assembly: AssemblyInformationalVersionAttribute("1.7.0")]
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Tue Nov 19 16:38:19 2013
@@ -302,29 +302,29 @@ namespace Apache.NMS.MQTT
 			}
 		}
 
-//		internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
-//		{
-//			if(!this.closing.Value)
-//			{
-//				this.dispatchers.Add(id, dispatcher);
-//			}
-//		}
-//
-//		internal void RemoveDispatcher(ConsumerId id)
-//		{
-//			if(!this.closing.Value)
-//			{
-//				this.dispatchers.Remove(id);
-//			}
-//		}
-//
-//		internal void AddProducer(ProducerId id, MessageProducer producer)
-//		{
-//			if(!this.closing.Value)
-//			{
-//				this.producers.Add(id, producer);
-//			}
-//		}
+		internal void AddDispatcher(int id, IDispatcher dispatcher)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Add(id, dispatcher);
+			}
+		}
+
+		internal void RemoveDispatcher(int id)
+		{
+			if(!this.closing.Value)
+			{
+				this.dispatchers.Remove(id);
+			}
+		}
+
+		internal void AddProducer(int id, MessageProducer producer)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Add(id, producer);
+			}
+		}
 
 		internal void RemoveProducer(int id)
 		{
@@ -520,7 +520,6 @@ namespace Apache.NMS.MQTT
 				}
 			}
 		}
-
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Tue Nov 19 16:38:19 2013
@@ -36,7 +36,6 @@ namespace Apache.NMS.MQTT.Messages
 
 		public virtual void OnSend()
 		{
-			base.OnSend();
 			StoreContent();
 		}
 
@@ -491,7 +490,8 @@ namespace Apache.NMS.MQTT.Messages
                     EndianBinaryReader reader = new EndianBinaryReader(target);
                     this.length = reader.ReadInt32();
                     
-                    target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
+					// TODO we could compress from 
+                    // target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
                 }
                 else
                 {
@@ -510,14 +510,14 @@ namespace Apache.NMS.MQTT.Messages
 				this.outputBuffer = new MemoryStream();
                 Stream target = this.outputBuffer;
 
-                if(this.Connection != null && this.Connection.UseCompression)
-                {
-                    this.length = 0;
-					this.Compressed = true;
-
-                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);                    
-                    target = new LengthTrackerStream(target, this);
-                }
+//                if(this.Connection != null && this.Connection.UseCompression)
+//                {
+//                    this.length = 0;
+//					this.Compressed = true;
+//
+//                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);                    
+//                    target = new LengthTrackerStream(target, this);
+//                }
                 
 				this.dataOut = new EndianBinaryWriter(target);
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Tue Nov 19 16:38:19 2013
@@ -26,14 +26,19 @@ namespace Apache.NMS.MQTT.Messages
 	public class MQTTMessage : IMessage, ICloneable
 	{
 		private readonly PUBLISH publish = new PUBLISH();
-		private MessagePropertyIntercepter propertyHelper;
-		private PrimitiveMap properties;
 		private Connection connection;
 		private Topic destination;
 		private short messageId;
+		private byte[] content;
+        private bool compressed;
+		private int redeliveryCounter;
+        private bool persistent;
 
 		public event AcknowledgeHandler Acknowledger;
 
+        private bool readOnlyMsgProperties;
+        private bool readOnlyMsgBody;
+
 		public static MQTTMessage Transform(IMessage message)
 		{
 			return (MQTTMessage) message;
@@ -45,17 +50,12 @@ namespace Apache.NMS.MQTT.Messages
 
         public override int GetHashCode()
         {
-            MessageId id = this.MessageId;
-
-            return id != null ? id.GetHashCode() : base.GetHashCode();
+            return messageId != 0 ? messageId : base.GetHashCode();
         }
 
 		public virtual object Clone()
 		{
-			MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
-
-			cloneMessage.propertyHelper = new MessagePropertyIntercepter(cloneMessage, cloneMessage.properties, this.ReadOnlyProperties) { AllowByteArrays = false };
-			return cloneMessage;
+			return this.MemberwiseClone();
 		}
 
         public override bool Equals(object that)
@@ -69,10 +69,10 @@ namespace Apache.NMS.MQTT.Messages
 
         public virtual bool Equals(MQTTMessage that)
         {
-            MessageId oMsg = that.MessageId;
-            MessageId thisMsg = this.MessageId;
+            short oMsg = that.MessageId;
+            short thisMsg = this.MessageId;
             
-            return thisMsg != null && oMsg != null && oMsg.Equals(thisMsg);
+            return thisMsg != 0 && oMsg != 0 && oMsg == thisMsg;
         }
         
 		public void Acknowledge()
@@ -113,21 +113,35 @@ namespace Apache.NMS.MQTT.Messages
 
 		#region Properties
 
+        public byte[] Content
+        {
+            get { return content; }
+            set { this.content = value; }
+        }
+
+        public bool Compressed
+        {
+            get { return compressed; }
+            set { this.compressed = value; }
+        }
+
+        public virtual bool ReadOnlyProperties
+        {
+            get { return this.readOnlyMsgProperties; }
+            set { this.readOnlyMsgProperties = value; }
+        }
+
+        public virtual bool ReadOnlyBody
+        {
+            get { return this.readOnlyMsgBody; }
+            set { this.readOnlyMsgBody = value; }
+        }
+
 		public IPrimitiveMap Properties
 		{
 			get
 			{
-				if(null == properties)
-				{
-					properties = PrimitiveMap.Unmarshal(MarshalledProperties);
-					propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties)
-					                     {AllowByteArrays = false};
-					
-					// Since JMS doesn't define a Byte array interface for properties we
-					// disable them here to prevent sending invalid data to the broker.
-				}
-
-				return propertyHelper;
+				throw new NotSupportedException("MQTT does not support Message properties.");
 			}
 		}
 
@@ -142,8 +156,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public string NMSCorrelationID
 		{
-			get { return CorrelationId; }
-			set { CorrelationId = value; }
+			get { return String.Empty; }
+			set {}
 		}
 
 		/// <summary>
@@ -151,38 +165,17 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public IDestination NMSDestination
 		{
-			get { return Destination; }
-            set { Destination = value as ActiveMQDestination; }
+			get { return destination; }
+            set { this.destination = value as Topic; }
 		}
 
-		private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
 		/// <summary>
 		/// The time in milliseconds that this message should expire in
 		/// </summary>
 		public TimeSpan NMSTimeToLive
 		{
-			get
-			{
-				if(Expiration > 0 && timeToLive.TotalMilliseconds <= 0.0)
-				{
-					timeToLive = TimeSpan.FromMilliseconds(Expiration - Timestamp);
-				}
-
-				return timeToLive;
-			}
-
-			set
-			{
-				timeToLive = value;
-				if(timeToLive.TotalMilliseconds > 0)
-				{
-					Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
-				}
-				else
-				{
-					Expiration = 0;
-				}
-			}
+			get { return TimeSpan.MaxValue; }
+			set {}
 		}
 
 		/// <summary>
@@ -190,33 +183,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public string NMSMessageId
 		{
-			get
-			{
-			    return null != MessageId ? BaseDataStreamMarshaller.ToString(MessageId) : String.Empty;
-			}
-
-		    set
-            {
-                if(value != null) 
-                {
-                    try 
-                    {
-                        MessageId id = new MessageId(value);
-                        this.MessageId = id;
-                    } 
-                    catch(FormatException) 
-                    {
-                        // we must be some foreign JMS provider or strange user-supplied
-                        // String so lets set the IDs to be 1
-                        MessageId id = new MessageId();
-                        this.MessageId = id;
-                    }
-                } 
-                else
-                {
-                    this.MessageId = null;
-                }
-            }
+			get { return this.messageId.ToString(); }
+			set { this.messageId = Int16.Parse(value); }
 		}
 
 		/// <summary>
@@ -224,8 +192,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public MsgDeliveryMode NMSDeliveryMode
 		{
-			get { return (Persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
-			set { Persistent = (MsgDeliveryMode.Persistent == value); }
+			get { return (persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
+			set { persistent = (MsgDeliveryMode.Persistent == value); }
 		}
 
 		/// <summary>
@@ -233,8 +201,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public MsgPriority NMSPriority
 		{
-			get { return (MsgPriority) Priority; }
-			set { Priority = (byte) value; }
+			get { return MsgPriority.Normal; }
+			set {}
 		}
 
 		/// <summary>
@@ -242,22 +210,22 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public bool NMSRedelivered
 		{
-			get { return (RedeliveryCounter > 0); }
+			get { return (redeliveryCounter > 0); }
 
             set
             {
                 if(value == true)
                 {
-                    if(this.RedeliveryCounter <= 0)
+                    if(this.redeliveryCounter <= 0)
                     {
-                        this.RedeliveryCounter = 1;
+                        this.redeliveryCounter = 1;
                     }
                 }
                 else
                 {
-                    if(this.RedeliveryCounter > 0)
+                    if(this.redeliveryCounter > 0)
                     {
-                        this.RedeliveryCounter = 0;
+                        this.redeliveryCounter = 0;
                     }
                 }
             }
@@ -268,8 +236,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public IDestination NMSReplyTo
 		{
-			get { return ReplyTo; }
-			set { ReplyTo = ActiveMQDestination.Transform(value); }
+			get { return null; }
+			set { }
 		}
 
 		/// <summary>
@@ -277,15 +245,8 @@ namespace Apache.NMS.MQTT.Messages
 		/// </summary>
 		public DateTime NMSTimestamp
 		{
-			get { return DateUtils.ToDateTime(Timestamp); }
-			set
-			{
-				Timestamp = DateUtils.ToJavaTimeUtc(value);
-				if(timeToLive.TotalMilliseconds > 0)
-				{
-					Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
-				}
-			}
+			get { return DateTime.Now; }
+			set {}
 		}
 
 		/// <summary>
@@ -297,7 +258,7 @@ namespace Apache.NMS.MQTT.Messages
 			set {  }
 		}
 
-		public int MessageId
+		public short MessageId
 		{
 			get { return this.messageId; }
 			set { this.messageId = value; }
@@ -305,15 +266,6 @@ namespace Apache.NMS.MQTT.Messages
 
 		#endregion
 
-		public object GetObjectProperty(string name)
-		{
-			return Properties[name];
-		}
-
-		public void SetObjectProperty(string name, object value)
-		{
-			Properties[name] = value;
-		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue Nov 19 16:38:19 2013
@@ -35,22 +35,22 @@ namespace Apache.NMS.MQTT
 			get { return this.name; }
 		}
 
-		DestinationType DestinationType 
+		public DestinationType DestinationType 
 		{ 
 			get { return DestinationType.Topic; }
 		}
 		
-		bool IsTopic
+		public bool IsTopic
 		{ 
 			get { return true; }
 		}
 		
-		bool IsQueue 
+		public bool IsQueue 
 		{ 
 			get { return false; }
 		}
 		
-		bool IsTemporary 
+		public bool IsTemporary 
 		{ 
 			get { return false; }
 		}

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs Tue Nov 19 16:38:19 2013
@@ -29,6 +29,16 @@ namespace Apache.NMS.MQTT.Transport
             set { this.commandId = value; }
         }
 
+		public virtual int CommandType
+		{
+			get { return 0; }
+		}
+
+		public virtual string CommandName
+		{
+			get { return this.GetType().Name; }
+		}
+
         public override int GetHashCode()
         {
             return (CommandId * 37) + CommandType;
@@ -143,14 +153,20 @@ namespace Apache.NMS.MQTT.Transport
 
         public virtual Object Clone()
         {
-            // Since we are a derived class use the base's Clone()
-            // to perform the shallow copy. Since it is shallow it
-            // will include our derived class. Since we are derived,
-            // this method is an override.
-            BaseCommand o = (BaseCommand) base.Clone();
-
-            return o;
+            return this.MemberwiseClone();
         }
+
+		public int HashCode(object value)
+		{
+			if(value != null)
+			{
+				return value.GetHashCode();
+			}
+			else
+			{
+				return -1;
+			}
+		}
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Tue Nov 19 16:38:19 2013
@@ -91,6 +91,8 @@
     <Compile Include="src\main\csharp\Transport\MQTTTransportFactoryAttribute.cs" />
     <Compile Include="src\main\csharp\Commands\PUBCOMP.cs" />
     <Compile Include="src\main\csharp\RequestTimedOutException.cs" />
+    <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+    <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
   </ItemGroup>
   <ItemGroup>
     <Folder Include="keyfile\" />