You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/19 17:38:20 UTC
svn commit: r1543485 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk:
./ src/main/csharp/ src/main/csharp/Messages/ src/main/csharp/Transport/
Author: tabish
Date: Tue Nov 19 16:38:19 2013
New Revision: 1543485
URL: http://svn.apache.org/r1543485
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Tue Nov 19 16:38:19 2013
@@ -22,6 +22,6 @@ using System.Runtime.InteropServices;
[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
[assembly: AssemblyTrademarkAttribute("")]
[assembly: AssemblyCultureAttribute("")]
-[assembly: AssemblyVersionAttribute("1.7.0.3237")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Tue Nov 19 16:38:19 2013
@@ -302,29 +302,29 @@ namespace Apache.NMS.MQTT
}
}
-// internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
-// {
-// if(!this.closing.Value)
-// {
-// this.dispatchers.Add(id, dispatcher);
-// }
-// }
-//
-// internal void RemoveDispatcher(ConsumerId id)
-// {
-// if(!this.closing.Value)
-// {
-// this.dispatchers.Remove(id);
-// }
-// }
-//
-// internal void AddProducer(ProducerId id, MessageProducer producer)
-// {
-// if(!this.closing.Value)
-// {
-// this.producers.Add(id, producer);
-// }
-// }
+ internal void AddDispatcher(int id, IDispatcher dispatcher)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Add(id, dispatcher);
+ }
+ }
+
+ internal void RemoveDispatcher(int id)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Remove(id);
+ }
+ }
+
+ internal void AddProducer(int id, MessageProducer producer)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Add(id, producer);
+ }
+ }
internal void RemoveProducer(int id)
{
@@ -520,7 +520,6 @@ namespace Apache.NMS.MQTT
}
}
}
-
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Tue Nov 19 16:38:19 2013
@@ -36,7 +36,6 @@ namespace Apache.NMS.MQTT.Messages
public virtual void OnSend()
{
- base.OnSend();
StoreContent();
}
@@ -491,7 +490,8 @@ namespace Apache.NMS.MQTT.Messages
EndianBinaryReader reader = new EndianBinaryReader(target);
this.length = reader.ReadInt32();
- target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
+ // TODO we could compress from
+ // target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
}
else
{
@@ -510,14 +510,14 @@ namespace Apache.NMS.MQTT.Messages
this.outputBuffer = new MemoryStream();
Stream target = this.outputBuffer;
- if(this.Connection != null && this.Connection.UseCompression)
- {
- this.length = 0;
- this.Compressed = true;
-
- target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
- target = new LengthTrackerStream(target, this);
- }
+// if(this.Connection != null && this.Connection.UseCompression)
+// {
+// this.length = 0;
+// this.Compressed = true;
+//
+// target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
+// target = new LengthTrackerStream(target, this);
+// }
this.dataOut = new EndianBinaryWriter(target);
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Tue Nov 19 16:38:19 2013
@@ -26,14 +26,19 @@ namespace Apache.NMS.MQTT.Messages
public class MQTTMessage : IMessage, ICloneable
{
private readonly PUBLISH publish = new PUBLISH();
- private MessagePropertyIntercepter propertyHelper;
- private PrimitiveMap properties;
private Connection connection;
private Topic destination;
private short messageId;
+ private byte[] content;
+ private bool compressed;
+ private int redeliveryCounter;
+ private bool persistent;
public event AcknowledgeHandler Acknowledger;
+ private bool readOnlyMsgProperties;
+ private bool readOnlyMsgBody;
+
public static MQTTMessage Transform(IMessage message)
{
return (MQTTMessage) message;
@@ -45,17 +50,12 @@ namespace Apache.NMS.MQTT.Messages
public override int GetHashCode()
{
- MessageId id = this.MessageId;
-
- return id != null ? id.GetHashCode() : base.GetHashCode();
+ return messageId != 0 ? messageId : base.GetHashCode();
}
public virtual object Clone()
{
- MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
-
- cloneMessage.propertyHelper = new MessagePropertyIntercepter(cloneMessage, cloneMessage.properties, this.ReadOnlyProperties) { AllowByteArrays = false };
- return cloneMessage;
+ return this.MemberwiseClone();
}
public override bool Equals(object that)
@@ -69,10 +69,10 @@ namespace Apache.NMS.MQTT.Messages
public virtual bool Equals(MQTTMessage that)
{
- MessageId oMsg = that.MessageId;
- MessageId thisMsg = this.MessageId;
+ short oMsg = that.MessageId;
+ short thisMsg = this.MessageId;
- return thisMsg != null && oMsg != null && oMsg.Equals(thisMsg);
+ return thisMsg != 0 && oMsg != 0 && oMsg == thisMsg;
}
public void Acknowledge()
@@ -113,21 +113,35 @@ namespace Apache.NMS.MQTT.Messages
#region Properties
+ public byte[] Content
+ {
+ get { return content; }
+ set { this.content = value; }
+ }
+
+ public bool Compressed
+ {
+ get { return compressed; }
+ set { this.compressed = value; }
+ }
+
+ public virtual bool ReadOnlyProperties
+ {
+ get { return this.readOnlyMsgProperties; }
+ set { this.readOnlyMsgProperties = value; }
+ }
+
+ public virtual bool ReadOnlyBody
+ {
+ get { return this.readOnlyMsgBody; }
+ set { this.readOnlyMsgBody = value; }
+ }
+
public IPrimitiveMap Properties
{
get
{
- if(null == properties)
- {
- properties = PrimitiveMap.Unmarshal(MarshalledProperties);
- propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties)
- {AllowByteArrays = false};
-
- // Since JMS doesn't define a Byte array interface for properties we
- // disable them here to prevent sending invalid data to the broker.
- }
-
- return propertyHelper;
+ throw new NotSupportedException("MQTT does not support Message properties.");
}
}
@@ -142,8 +156,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public string NMSCorrelationID
{
- get { return CorrelationId; }
- set { CorrelationId = value; }
+ get { return String.Empty; }
+ set {}
}
/// <summary>
@@ -151,38 +165,17 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public IDestination NMSDestination
{
- get { return Destination; }
- set { Destination = value as ActiveMQDestination; }
+ get { return destination; }
+ set { this.destination = value as Topic; }
}
- private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
/// <summary>
/// The time in milliseconds that this message should expire in
/// </summary>
public TimeSpan NMSTimeToLive
{
- get
- {
- if(Expiration > 0 && timeToLive.TotalMilliseconds <= 0.0)
- {
- timeToLive = TimeSpan.FromMilliseconds(Expiration - Timestamp);
- }
-
- return timeToLive;
- }
-
- set
- {
- timeToLive = value;
- if(timeToLive.TotalMilliseconds > 0)
- {
- Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
- }
- else
- {
- Expiration = 0;
- }
- }
+ get { return TimeSpan.MaxValue; }
+ set {}
}
/// <summary>
@@ -190,33 +183,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public string NMSMessageId
{
- get
- {
- return null != MessageId ? BaseDataStreamMarshaller.ToString(MessageId) : String.Empty;
- }
-
- set
- {
- if(value != null)
- {
- try
- {
- MessageId id = new MessageId(value);
- this.MessageId = id;
- }
- catch(FormatException)
- {
- // we must be some foreign JMS provider or strange user-supplied
- // String so lets set the IDs to be 1
- MessageId id = new MessageId();
- this.MessageId = id;
- }
- }
- else
- {
- this.MessageId = null;
- }
- }
+ get { return this.messageId.ToString(); }
+ set { this.messageId = Int16.Parse(value); }
}
/// <summary>
@@ -224,8 +192,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public MsgDeliveryMode NMSDeliveryMode
{
- get { return (Persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
- set { Persistent = (MsgDeliveryMode.Persistent == value); }
+ get { return (persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
+ set { persistent = (MsgDeliveryMode.Persistent == value); }
}
/// <summary>
@@ -233,8 +201,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public MsgPriority NMSPriority
{
- get { return (MsgPriority) Priority; }
- set { Priority = (byte) value; }
+ get { return MsgPriority.Normal; }
+ set {}
}
/// <summary>
@@ -242,22 +210,22 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public bool NMSRedelivered
{
- get { return (RedeliveryCounter > 0); }
+ get { return (redeliveryCounter > 0); }
set
{
if(value == true)
{
- if(this.RedeliveryCounter <= 0)
+ if(this.redeliveryCounter <= 0)
{
- this.RedeliveryCounter = 1;
+ this.redeliveryCounter = 1;
}
}
else
{
- if(this.RedeliveryCounter > 0)
+ if(this.redeliveryCounter > 0)
{
- this.RedeliveryCounter = 0;
+ this.redeliveryCounter = 0;
}
}
}
@@ -268,8 +236,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public IDestination NMSReplyTo
{
- get { return ReplyTo; }
- set { ReplyTo = ActiveMQDestination.Transform(value); }
+ get { return null; }
+ set { }
}
/// <summary>
@@ -277,15 +245,8 @@ namespace Apache.NMS.MQTT.Messages
/// </summary>
public DateTime NMSTimestamp
{
- get { return DateUtils.ToDateTime(Timestamp); }
- set
- {
- Timestamp = DateUtils.ToJavaTimeUtc(value);
- if(timeToLive.TotalMilliseconds > 0)
- {
- Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
- }
- }
+ get { return DateTime.Now; }
+ set {}
}
/// <summary>
@@ -297,7 +258,7 @@ namespace Apache.NMS.MQTT.Messages
set { }
}
- public int MessageId
+ public short MessageId
{
get { return this.messageId; }
set { this.messageId = value; }
@@ -305,15 +266,6 @@ namespace Apache.NMS.MQTT.Messages
#endregion
- public object GetObjectProperty(string name)
- {
- return Properties[name];
- }
-
- public void SetObjectProperty(string name, object value)
- {
- Properties[name] = value;
- }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue Nov 19 16:38:19 2013
@@ -35,22 +35,22 @@ namespace Apache.NMS.MQTT
get { return this.name; }
}
- DestinationType DestinationType
+ public DestinationType DestinationType
{
get { return DestinationType.Topic; }
}
- bool IsTopic
+ public bool IsTopic
{
get { return true; }
}
- bool IsQueue
+ public bool IsQueue
{
get { return false; }
}
- bool IsTemporary
+ public bool IsTemporary
{
get { return false; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs Tue Nov 19 16:38:19 2013
@@ -29,6 +29,16 @@ namespace Apache.NMS.MQTT.Transport
set { this.commandId = value; }
}
+ public virtual int CommandType
+ {
+ get { return 0; }
+ }
+
+ public virtual string CommandName
+ {
+ get { return this.GetType().Name; }
+ }
+
public override int GetHashCode()
{
return (CommandId * 37) + CommandType;
@@ -143,14 +153,20 @@ namespace Apache.NMS.MQTT.Transport
public virtual Object Clone()
{
- // Since we are a derived class use the base's Clone()
- // to perform the shallow copy. Since it is shallow it
- // will include our derived class. Since we are derived,
- // this method is an override.
- BaseCommand o = (BaseCommand) base.Clone();
-
- return o;
+ return this.MemberwiseClone();
}
+
+ public int HashCode(object value)
+ {
+ if(value != null)
+ {
+ return value.GetHashCode();
+ }
+ else
+ {
+ return -1;
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1543485&r1=1543484&r2=1543485&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Tue Nov 19 16:38:19 2013
@@ -91,6 +91,8 @@
<Compile Include="src\main\csharp\Transport\MQTTTransportFactoryAttribute.cs" />
<Compile Include="src\main\csharp\Commands\PUBCOMP.cs" />
<Compile Include="src\main\csharp\RequestTimedOutException.cs" />
+ <Compile Include="src\main\csharp\Threads\DefaultThreadPools.cs" />
+ <Compile Include="src\main\csharp\Threads\PooledTaskRunner.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="keyfile\" />