You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/19 19:32:02 UTC
svn commit: r1543529 - in
/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src: main/csharp/
main/csharp/Commands/ main/csharp/Util/ test/csharp/
Author: tabish
Date: Tue Nov 19 18:32:02 2013
New Revision: 1543529
URL: http://svn.apache.org/r1543529
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs Tue Nov 19 18:32:02 2013
@@ -30,12 +30,12 @@ namespace Apache.NMS.MQTT.Commands
set { this.returnCode = value; }
}
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "CONNACK"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
public const byte TYPE = 1;
public const String PROTOCOL_NAME = "MQIsdp";
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "CONNECT"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 14;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "DISCONNECT"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 11;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PINGREQ"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 13;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PINGRESP"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 4;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PUBACK"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBCOMP.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 7;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PUBCOMP"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs Tue Nov 19 18:32:02 2013
@@ -32,12 +32,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 3;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PUBLISH"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 5;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PUBREC"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 6;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "PUBREL"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs Tue Nov 19 18:32:02 2013
@@ -29,12 +29,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 8;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "SUBACK"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs Tue Nov 19 18:32:02 2013
@@ -27,12 +27,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 7;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "SUBSCRIBE"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 10;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "UNSUBACK"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs Tue Nov 19 18:32:02 2013
@@ -23,12 +23,12 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 9;
- public int CommandType
+ public override int CommandType
{
get { return TYPE; }
}
- public string CommandName
+ public override string CommandName
{
get { return "UNSUBSCRIBE"; }
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Tue Nov 19 18:32:02 2013
@@ -28,7 +28,6 @@ namespace Apache.NMS.MQTT
{
public class Connection : IConnection
{
- private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite);
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
@@ -38,14 +37,12 @@ namespace Apache.NMS.MQTT
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
- private readonly object myLock = new object();
private readonly Atomic<bool> connected = new Atomic<bool>(false);
private readonly Atomic<bool> closed = new Atomic<bool>(false);
private readonly Atomic<bool> closing = new Atomic<bool>(false);
private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
private readonly object connectedLock = new object();
private Exception firstFailureError = null;
- private bool userSpecifiedClientID;
private int sessionCounter = 0;
private readonly Atomic<bool> started = new Atomic<bool>(false);
private ConnectionMetaData metaData = null;
@@ -55,6 +52,8 @@ namespace Apache.NMS.MQTT
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
private readonly IdGenerator clientIdGenerator;
private IRedeliveryPolicy redeliveryPolicy;
+ private Scheduler scheduler = null;
+ private bool userSpecifiedClientID;
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -63,6 +62,7 @@ namespace Apache.NMS.MQTT
SetTransport(transport);
+ this.messageTransformation = new MQTTMessageTransformation(this);
this.info = new CONNECT();
}
@@ -121,21 +121,21 @@ namespace Apache.NMS.MQTT
get { return info.ClientId; }
set
{
- if(this.connected.Value)
- {
- throw new NMSException("You cannot change the ClientId once the Connection is connected");
- }
-
- this.info.ClientId = value;
- this.userSpecifiedClientID = true;
- CheckConnected();
+ if(this.connected.Value)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ CheckConnected();
}
}
/// <summary>
/// The Default Client Id used if the ClientId property is not set explicity.
/// </summary>
- public string DefaultClientId
+ internal string DefaultClientId
{
set
{
@@ -282,7 +282,8 @@ namespace Apache.NMS.MQTT
protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
{
CheckConnected();
- return new Session(this, ackMode);
+ int sessionId = Interlocked.Increment(ref sessionCounter);
+ return new Session(this, ackMode, sessionId);
}
internal void AddSession(Session session)
@@ -340,7 +341,81 @@ namespace Apache.NMS.MQTT
public void Close()
{
- // TODO
+ if(!this.closed.Value && !transportFailed.Value)
+ {
+ this.Stop();
+ }
+
+ lock(connectedLock)
+ {
+ if(this.closed.Value)
+ {
+ return;
+ }
+
+ try
+ {
+ Tracer.InfoFormat("Connection[{0}]: Closing Connection Now.", this.ClientId);
+ this.closing.Value = true;
+
+ Scheduler scheduler = this.scheduler;
+ if (scheduler != null)
+ {
+ try
+ {
+ scheduler.Stop();
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Shutdown();
+ }
+ }
+ sessions.Clear();
+
+ // Connected is true only when we've successfully sent our CONNECT
+ // to the broker, so if we haven't announced ourselves there's no need to
+ // inform the broker of a remove, and if the transport is failed, why bother.
+ if(connected.Value && !transportFailed.Value)
+ {
+ DISCONNECT disconnect = new DISCONNECT();
+ transport.Oneway(disconnect);
+ }
+
+ executor.Shutdown();
+ if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+ {
+ Tracer.DebugFormat("Connection[{0}]: Failed to properly shutdown its executor", this.ClientId);
+ }
+
+ Tracer.DebugFormat("Connection[{0}]: Disposing of the Transport.", this.ClientId);
+ transport.Stop();
+ transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Connection[{0}]: Error during connection close: {1}", ClientId, ex);
+ }
+ finally
+ {
+ if(executor != null)
+ {
+ executor.Shutdown();
+ }
+
+ this.transport = null;
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
+ }
+ }
}
public void Dispose()
@@ -520,6 +595,36 @@ namespace Apache.NMS.MQTT
}
}
}
+
+ internal Scheduler Scheduler
+ {
+ get
+ {
+ Scheduler result = this.scheduler;
+ if (result == null)
+ {
+ lock (this)
+ {
+ result = scheduler;
+ if (result == null)
+ {
+ CheckClosed();
+ try
+ {
+ result = scheduler = new Scheduler(
+ "MQTTConnection["+this.info.ClientId+"] Scheduler");
+ scheduler.Start();
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+ }
+ }
+ return result;
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs Tue Nov 19 18:32:02 2013
@@ -107,6 +107,8 @@ namespace Apache.NMS.MQTT
if(this.clientId != null)
{
+ // Set the connection factory version as the default, the user can
+ // still override this via a call to Connection.ClientId = XXX
connection.DefaultClientId = this.clientId;
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Tue Nov 19 18:32:02 2013
@@ -38,11 +38,39 @@ namespace Apache.NMS.MQTT
private ThreadPoolExecutor executor;
private int consumerId;
protected bool disposed = false;
+ private Topic destination = null;
private event MessageListener listener;
- public MessageConsumer()
+ public MessageConsumer(Session session, Topic destination, int consumerId)
{
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+ else if(destination.TopicName == null)
+ {
+ throw new InvalidDestinationException("The destination object was not given a physical name.");
+ }
+
+ this.session = session;
+ this.consumerId = consumerId;
+ this.destination = destination;
+ this.messageTransformation = this.session.Connection.MessageTransformation;
+ this.unconsumedMessages = new FifoMessageDispatchChannel();
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ if(destination.Options != null)
+ {
+ // Get options prefixed with "consumer.*"
+ StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+ // Extract out custom extension options "consumer.nms.*"
+ StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+ URISupport.SetProperties(this, options);
+ URISupport.SetProperties(this, customConsumerOptions, "nms.");
+ }
}
#region Property Accessors
@@ -94,6 +122,11 @@ namespace Apache.NMS.MQTT
get { return this.consumerId; }
}
+ public Topic Destination
+ {
+ get { return this.destination; }
+ }
+
#endregion
public void Start()
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Tue Nov 19 18:32:02 2013
@@ -36,11 +36,20 @@ namespace Apache.NMS.MQTT
private readonly MessageTransformation messageTransformation;
- public MessageProducer(Session session, TimeSpan requestTimeout)
+ public MessageProducer(Session session, Topic destination, TimeSpan requestTimeout, int producerId)
{
this.session = session;
this.RequestTimeout = requestTimeout;
+ this.producerId = producerId;
+ this.destination = destination;
this.messageTransformation = session.Connection.MessageTransformation;
+
+ // If the destination contained a URI query, then use it to set public
+ // properties on the ProducerInfo
+ if (destination != null && destination.Options != null)
+ {
+ URISupport.SetProperties(this, destination.Options, "producer.");
+ }
}
~MessageProducer()
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Tue Nov 19 18:32:02 2013
@@ -16,6 +16,7 @@
//
using System;
using System.Collections;
+using System.Threading;
using Apache.NMS.MQTT.Messages;
namespace Apache.NMS.MQTT
@@ -43,9 +44,10 @@ namespace Apache.NMS.MQTT
private readonly AcknowledgementMode acknowledgementMode;
private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+ private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TimeSpan requestTimeout;
- public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+ public Session(Connection connection, AcknowledgementMode acknowledgementMode, int sessionId)
{
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
@@ -54,6 +56,7 @@ namespace Apache.NMS.MQTT
this.ProducerTransformer = connection.ProducerTransformer;
this.executor = new SessionExecutor(this, this.consumers);
+ this.sessionId = sessionId;
if(connection.IsStarted)
{
@@ -146,6 +149,11 @@ namespace Apache.NMS.MQTT
set { this.requestTimeout = value; }
}
+ public int SessionId
+ {
+ get { return this.sessionId; }
+ }
+
private ConsumerTransformerDelegate consumerTransformer;
/// <summary>
/// A Delegate that is called each time a Message is dispatched to allow the client to do
@@ -221,8 +229,65 @@ namespace Apache.NMS.MQTT
internal void DoClose()
{
+ Shutdown();
}
+ internal void Shutdown()
+ {
+ Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.SessionId);
+
+ if(this.closed)
+ {
+ return;
+ }
+
+ lock(myLock)
+ {
+ if(this.closed || this.closing)
+ {
+ return;
+ }
+
+ try
+ {
+ this.closing = true;
+
+ // Stop all message deliveries from this Session
+ this.executor.Stop(this.closeStopTimeout);
+
+ lock(consumers.SyncRoot)
+ {
+ foreach(MessageConsumer consumer in consumers.Values)
+ {
+ consumer.FailureError = this.connection.FirstFailureError;
+ consumer.Shutdown();
+ }
+ }
+ consumers.Clear();
+
+ lock(producers.SyncRoot)
+ {
+ foreach(MessageProducer producer in producers.Values)
+ {
+ producer.Shutdown();
+ }
+ }
+ producers.Clear();
+
+ Connection.RemoveSession(this);
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during session close: {0}", ex);
+ }
+ finally
+ {
+ this.closed = true;
+ this.closing = false;
+ }
+ }
+ }
+
public IMessageProducer CreateProducer()
{
return CreateProducer(null);
@@ -476,12 +541,22 @@ namespace Apache.NMS.MQTT
/// if the message is in a transaction.
/// </summary>
/// <param name="message">
- /// A <see cref="ActiveMQMessage"/>
+ /// A <see cref="MQTTMessage"/>
/// </param>
private static void DoNothingAcknowledge(MQTTMessage message)
{
}
+ private int NextConsumerId
+ {
+ get { return Interlocked.Increment(ref this.consumerCounter); }
+ }
+
+ private int NextProducerId
+ {
+ get { return Interlocked.Increment(ref this.producerCounter); }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs Tue Nov 19 18:32:02 2013
@@ -15,6 +15,7 @@
// limitations under the License.
//
using System;
+using System.Collections.Specialized;
namespace Apache.NMS.MQTT
{
@@ -23,7 +24,8 @@ namespace Apache.NMS.MQTT
/// </summary>
public class Topic : ITopic
{
- string name;
+ private string name;
+ private StringDictionary options = null;
public Topic(string name)
{
@@ -54,6 +56,16 @@ namespace Apache.NMS.MQTT
{
get { return false; }
}
+
+ /// <summary>
+ /// Dictionary of name/value pairs representing option values specified
+ /// in the URI used to create this Destination. A null value is returned
+ /// if no options were specified.
+ /// </summary>
+ internal StringDictionary Options
+ {
+ get { return this.options; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs?rev=1543529&r1=1543528&r2=1543529&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs Tue Nov 19 18:32:02 2013
@@ -17,7 +17,7 @@
using System;
using Apache.NMS.Util;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
namespace Apache.NMS.MQTT.Util
{
@@ -39,7 +39,9 @@ namespace Apache.NMS.MQTT.Util
protected override IBytesMessage DoCreateBytesMessage()
{
- return null;
+ BytesMessage message = new BytesMessage();
+ message.Connection = connection;
+ return message;
}
protected override ITextMessage DoCreateTextMessage()
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1543529&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs Tue Nov 19 18:32:02 2013
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// ------------------------------------------------------------------------------
+// <autogenerated>
+// This code was generated by a tool.
+// Mono Runtime Version: 4.0.30319.1
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </autogenerated>
+// ------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyDescriptionAttribute("Apache NMS for MQTT Class Library (.Net Messaging Library Implementation): An implementation of the NMS API for MQTT")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
+[assembly: AssemblyProductAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("1.7.0.3244")]
+[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/test/csharp/CommonAssemblyInfo.cs
------------------------------------------------------------------------------
svn:eol-style = native