You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/16 00:43:14 UTC
svn commit: r1542430 [1/2] - in
/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk: ./ src/main/csharp/
src/main/csharp/Commands/ src/main/csharp/Messages/
src/main/csharp/Threads/ src/main/csharp/Transport/ src/main/csharp/Util/
Author: tabish
Date: Fri Nov 15 23:43:13 2013
New Revision: 1542430
URL: http://svn.apache.org/r1542430
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Topic.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/BaseCommand.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/FutureResponse.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/MQTTTransportFactoryAttribute.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Response.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/TransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FactoryAttribute.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FactoryFinder.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/IdGenerator.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTDestination.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Nov 15 23:43:13 2013
@@ -2,3 +2,4 @@
build
lib
vs2008-mqtt.userprefs
+bin
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNACK.cs Fri Nov 15 23:43:13 2013
@@ -16,7 +16,7 @@
//
using System;
-namespace Apache.NMS.MQTT.Command
+namespace Apache.NMS.MQTT.Commands
{
public class CONNACK
{
@@ -32,6 +32,17 @@ namespace Apache.NMS.MQTT.Command
public CONNACK()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "CONNACK"; }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
public const byte TYPE = 1;
public const String PROTOCOL_NAME = "MQIsdp";
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "CONNECT"; }
+ }
+
private byte version = 3;
public byte Version
{
@@ -66,6 +76,13 @@ namespace Apache.NMS.MQTT.Commands
set { this.password = value; }
}
+ private String clientId;
+ public String ClientId
+ {
+ get { return this.clientId; }
+ set { this.clientId = value; }
+ }
+
private bool cleanSession;
public bool CleanSession
{
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/DISCONNECT.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public DISCONNECT()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "DISCONNECT"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGREQ.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,17 @@ namespace Apache.NMS.MQTT.Commands
public PINGREQ()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PINGREQ"; }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PINGRESP.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public PINGRESP()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PINGRESP"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBACK.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public PUBACK()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PUBACK"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBLISH.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
{
public const byte TYPE = 3;
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PUBLISH"; }
+ }
+
private byte qosLevel;
public byte QoSLevel
{
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREC.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public PUBREC()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PUBREC"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/PUBREL.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public PUBREL()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "PUBREL"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBACK.cs Fri Nov 15 23:43:13 2013
@@ -31,6 +31,16 @@ namespace Apache.NMS.MQTT.Commands
public SUBACK()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "SUBACK"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/SUBSCRIBE.cs Fri Nov 15 23:43:13 2013
@@ -29,6 +29,16 @@ namespace Apache.NMS.MQTT.Commands
public SUBSCRIBE()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "SUBSCRIBE"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBACK.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public UNSUBACK()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "UNSUBACK"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/UNSUBSCRIBE.cs Fri Nov 15 23:43:13 2013
@@ -25,6 +25,16 @@ namespace Apache.NMS.MQTT.Commands
public UNSUBSCRIBE()
{
}
+
+ public int CommandType
+ {
+ get { return TYPE; }
+ }
+
+ public int CommandName
+ {
+ get { return "UNSUBSCRIBE"; }
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Fri Nov 15 23:43:13 2013
@@ -15,13 +15,23 @@
// limitations under the License.
//
using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.Util;
using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Threads;
using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
namespace Apache.NMS.MQTT
{
public class Connection : IConnection
{
+ private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
+ private static readonly TimeSpan InfiniteTimeSpan = TimeSpan.FromMilliseconds(Timeout.Infinite);
+
+ private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private readonly CONNECT info = null;
private ITransport transport;
private readonly Uri brokerUri;
@@ -32,15 +42,19 @@ namespace Apache.NMS.MQTT
private readonly Atomic<bool> closed = new Atomic<bool>(false);
private readonly Atomic<bool> closing = new Atomic<bool>(false);
private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+ private readonly object connectedLock = new object();
private Exception firstFailureError = null;
+ private bool userSpecifiedClientID;
private int sessionCounter = 0;
private readonly Atomic<bool> started = new Atomic<bool>(false);
private ConnectionMetaData metaData = null;
private bool disposed = false;
+ private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; // from connection factory
private readonly MessageTransformation messageTransformation;
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ private readonly IdGenerator clientIdGenerator;
- public Connection(Uri connectionUri, ITransport transport)
+ public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
this.brokerUri = connectionUri;
this.clientIdGenerator = clientIdGenerator;
@@ -100,6 +114,34 @@ namespace Apache.NMS.MQTT
set { this.info.Password = value; }
}
+ public string ClientId
+ {
+ get { return info.ClientId; }
+ set
+ {
+ if(this.connected.Value)
+ {
+ throw new NMSException("You cannot change the ClientId once the Connection is connected");
+ }
+
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ CheckConnected();
+ }
+ }
+
+ /// <summary>
+ /// The Default Client Id used if the ClientId property is not set explicity.
+ /// </summary>
+ public string DefaultClientId
+ {
+ set
+ {
+ this.info.ClientId = value;
+ this.userSpecifiedClientID = true;
+ }
+ }
+
/// <summary>
/// This property sets the acknowledgment mode for the connection.
/// The URI parameter connection.ackmode can be set to a string value
@@ -229,7 +271,7 @@ namespace Apache.NMS.MQTT
protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
{
CheckConnected();
- return new Session(this, NextSessionId, ackMode);
+ return new Session(this, ackMode);
}
internal void AddSession(Session session)
@@ -249,41 +291,40 @@ namespace Apache.NMS.MQTT
}
}
- internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
- {
- if(!this.closing.Value)
- {
- this.dispatchers.Add(id, dispatcher);
- }
- }
-
- internal void RemoveDispatcher(ConsumerId id)
- {
- if(!this.closing.Value)
- {
- this.dispatchers.Remove(id);
- }
- }
-
- internal void AddProducer(ProducerId id, MessageProducer producer)
- {
- if(!this.closing.Value)
- {
- this.producers.Add(id, producer);
- }
- }
-
- internal void RemoveProducer(ProducerId id)
- {
- if(!this.closing.Value)
- {
- this.producers.Remove(id);
- }
- }
+// internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
+// {
+// if(!this.closing.Value)
+// {
+// this.dispatchers.Add(id, dispatcher);
+// }
+// }
+//
+// internal void RemoveDispatcher(ConsumerId id)
+// {
+// if(!this.closing.Value)
+// {
+// this.dispatchers.Remove(id);
+// }
+// }
+//
+// internal void AddProducer(ProducerId id, MessageProducer producer)
+// {
+// if(!this.closing.Value)
+// {
+// this.producers.Add(id, producer);
+// }
+// }
+//
+// internal void RemoveProducer(ProducerId id)
+// {
+// if(!this.closing.Value)
+// {
+// this.producers.Remove(id);
+// }
+// }
internal void RemoveDispatcher(IDispatcher dispatcher)
{
- this.connectionAudit.RemoveDispatcher(dispatcher);
}
public void Close()
@@ -316,6 +357,22 @@ namespace Apache.NMS.MQTT
disposed = true;
}
+ protected void OnCommand(ITransport commandTransport, Command command)
+ {
+ }
+
+ internal void OnTransportException(ITransport source, Exception cause)
+ {
+ }
+
+ protected void OnTransportInterrupted(ITransport sender)
+ {
+ }
+
+ protected void OnTransportResumed(ITransport sender)
+ {
+ }
+
protected void CheckClosedOrFailed()
{
CheckClosed();
@@ -333,6 +390,106 @@ namespace Apache.NMS.MQTT
}
}
+ /// <summary>
+ /// Check and ensure that the connection object is connected. If it is not
+ /// connected or is closed or closing, a ConnectionClosedException is thrown.
+ /// </summary>
+ internal void CheckConnected()
+ {
+ if(closed.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if(!connected.Value)
+ {
+ DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+ int waitCount = 1;
+
+ while(true)
+ {
+ if(Monitor.TryEnter(connectedLock))
+ {
+ try
+ {
+ if(closed.Value || closing.Value)
+ {
+ break;
+ }
+ else if(!connected.Value)
+ {
+ if(!this.userSpecifiedClientID)
+ {
+ this.info.ClientId = this.clientIdGenerator.GenerateId();
+ }
+
+ try
+ {
+ if(null != transport)
+ {
+ // Make sure the transport is started.
+ if(!this.transport.IsStarted)
+ {
+ this.transport.Start();
+ }
+
+ // Send the connection and see if an ack/nak is returned.
+ Response response = transport.Request(this.info, this.RequestTimeout);
+ if(!(response is ExceptionResponse))
+ {
+ connected.Value = true;
+ }
+ else
+ {
+ ExceptionResponse error = response as ExceptionResponse;
+ NMSException exception = CreateExceptionFromBrokerError(error.Exception);
+ if(exception is InvalidClientIDException)
+ {
+ // This is non-recoverable.
+ // Shutdown the transport connection, and re-create it, but don't start it.
+ // It will be started if the connection is re-attempted.
+ this.transport.Stop();
+ ITransport newTransport = TransportFactory.CreateTransport(this.brokerUri);
+ SetTransport(newTransport);
+ throw exception;
+ }
+ }
+ }
+ }
+ catch(BrokerException)
+ {
+ // We Swallow the generic version and throw ConnectionClosedException
+ }
+ catch(NMSException)
+ {
+ throw;
+ }
+ }
+ }
+ finally
+ {
+ Monitor.Exit(connectedLock);
+ }
+ }
+
+ if(connected.Value || closed.Value || closing.Value
+ || (DateTime.Now > timeoutTime && this.RequestTimeout != InfiniteTimeSpan))
+ {
+ break;
+ }
+
+ // Back off from being overly aggressive. Having too many threads
+ // aggressively trying to connect to a down broker pegs the CPU.
+ Thread.Sleep(5 * (waitCount++));
+ }
+
+ if(!connected.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFactory.cs Fri Nov 15 23:43:13 2013
@@ -19,6 +19,8 @@ using System;
using System.Collections.Specialized;
using Apache.NMS.Util;
using Apache.NMS.Policies;
+using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Util;
namespace Apache.NMS.MQTT
{
@@ -36,7 +38,7 @@ namespace Apache.NMS.MQTT
private string connectionPassword;
private string clientId;
private string clientIdPrefix;
- //private IdGenerator clientIdGenerator;
+ private IdGenerator clientIdGenerator;
private bool copyMessageOnSend = true;
private bool asyncSend;
@@ -159,23 +161,17 @@ namespace Apache.NMS.MQTT
StringDictionary properties = URISupport.ParseQuery(query);
StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
- StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
if(connection != null)
{
URISupport.SetProperties(this, connection, "connection.");
}
-
- if(nms != null)
- {
- URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
- URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
- }
brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
}
}
}
+
public string UserName
{
get { return connectionUserName; }
@@ -259,29 +255,29 @@ namespace Apache.NMS.MQTT
}
}
-// public IdGenerator ClientIdGenerator
-// {
-// set { this.clientIdGenerator = value; }
-// get
-// {
-// lock(this)
-// {
-// if(this.clientIdGenerator == null)
-// {
-// if(this.clientIdPrefix != null)
-// {
-// this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
-// }
-// else
-// {
-// this.clientIdGenerator = new IdGenerator();
-// }
-// }
-//
-// return this.clientIdGenerator;
-// }
-// }
-// }
+ public IdGenerator ClientIdGenerator
+ {
+ set { this.clientIdGenerator = value; }
+ get
+ {
+ lock(this)
+ {
+ if(this.clientIdGenerator == null)
+ {
+ if(this.clientIdPrefix != null)
+ {
+ this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+ }
+ else
+ {
+ this.clientIdGenerator = new IdGenerator();
+ }
+ }
+
+ return this.clientIdGenerator;
+ }
+ }
+ }
public event ExceptionListener OnException
{
@@ -313,15 +309,8 @@ namespace Apache.NMS.MQTT
protected virtual void ConfigureConnection(Connection connection)
{
- connection.AsyncSend = this.AsyncSend;
- connection.CopyMessageOnSend = this.CopyMessageOnSend;
- connection.AlwaysSyncSend = this.AlwaysSyncSend;
- connection.SendAcksAsync = this.SendAcksAsync;
- connection.DispatchAsync = this.DispatchAsync;
connection.AcknowledgementMode = this.acknowledgementMode;
connection.RequestTimeout = this.requestTimeout;
- connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
- connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
}
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT
+{
+ /// <summary>
+ /// Exception thrown when a connection is used that it has failed in some way.
+ /// </summary>
+ [Serializable]
+ public class ConnectionFailedException : NMSException
+ {
+ public ConnectionFailedException()
+ : base("The connection has failed!")
+ {
+ }
+
+ public ConnectionFailedException(string message)
+ : base(message)
+ {
+ }
+
+ public ConnectionFailedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public ConnectionFailedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public ConnectionFailedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the ConnectionFailedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected ConnectionFailedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionFailedException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs Fri Nov 15 23:43:13 2013
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
namespace Apache.NMS.MQTT
{
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Fri Nov 15 23:43:13 2013
@@ -18,7 +18,7 @@ using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Specialized;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
using Apache.NMS.MQTT.Util;
using Apache.NMS.MQTT.Threads;
using Apache.NMS.Util;
@@ -27,6 +27,7 @@ namespace Apache.NMS.MQTT
{
public class MessageConsumer : IMessageConsumer, IDispatcher
{
+ private readonly Session session;
private readonly MessageTransformation messageTransformation;
private readonly MessageDispatchChannel unconsumedMessages;
private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
@@ -61,8 +62,111 @@ namespace Apache.NMS.MQTT
set { this.consumerTransformer = value; }
}
+
+ public event MessageListener Listener
+ {
+ add
+ {
+ CheckClosed();
+
+ bool wasStarted = this.session.Started;
+
+ if(wasStarted)
+ {
+ this.session.Stop();
+ }
+
+ listener += value;
+ this.session.Redispatch(this.unconsumedMessages);
+
+ if(wasStarted)
+ {
+ this.session.Start();
+ }
+ }
+ remove { listener -= value; }
+ }
+
#endregion
+ public void Start()
+ {
+ if(this.unconsumedMessages.Closed)
+ {
+ return;
+ }
+
+ this.started.Value = true;
+ this.unconsumedMessages.Start();
+ this.session.Executor.Wakeup();
+ }
+
+ public void Stop()
+ {
+ this.started.Value = false;
+ this.unconsumedMessages.Stop();
+ }
+
+ public bool Iterate()
+ {
+ if(this.listener != null)
+ {
+ MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
+ if(dispatch != null)
+ {
+ this.Dispatch(dispatch);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void CheckClosed()
+ {
+ if(this.unconsumedMessages.Closed)
+ {
+ throw new NMSException("The Consumer has been Closed");
+ }
+ }
+
+ private void CheckMessageListener()
+ {
+ if(this.listener != null)
+ {
+ throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+ }
+ }
+
+ protected bool IsAutoAcknowledgeEach
+ {
+ get
+ {
+ return this.session.IsAutoAcknowledge ||
+ (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
+ }
+ }
+
+ protected bool IsAutoAcknowledgeBatch
+ {
+ get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
+ }
+
+ protected bool IsIndividualAcknowledge
+ {
+ get { return this.session.IsIndividualAcknowledge; }
+ }
+
+ protected bool IsClientAcknowledge
+ {
+ get { return this.session.IsClientAcknowledge; }
+ }
+
+ internal bool Closed
+ {
+ get { return this.unconsumedMessages.Closed; }
+ }
+
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Fri Nov 15 23:43:13 2013
@@ -19,7 +19,7 @@ using System;
using System.IO;
using Apache.NMS.Util;
-namespace Apache.NMS.MQTT.Commands
+namespace Apache.NMS.MQTT.Messages
{
public class BytesMessage : MQTTMessage, IBytesMessage
{
@@ -34,7 +34,7 @@ namespace Apache.NMS.MQTT.Commands
return base.Clone();
}
- public override void OnSend()
+ public virtual void OnSend()
{
base.OnSend();
StoreContent();
@@ -563,10 +563,10 @@ namespace Apache.NMS.MQTT.Commands
/// </summary>
private class LengthTrackerStream : Stream
{
- private readonly ActiveMQBytesMessage parent;
+ private readonly BytesMessage parent;
private readonly Stream sink;
- public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+ public LengthTrackerStream(Stream sink, BytesMessage parent) : base()
{
this.sink = sink;
this.parent = parent;
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Fri Nov 15 23:43:13 2013
@@ -49,12 +49,7 @@ namespace Apache.NMS.MQTT.Messages
return id != null ? id.GetHashCode() : base.GetHashCode();
}
- public override byte GetDataStructureType()
- {
- return ID_ACTIVEMQMESSAGE;
- }
-
- public override object Clone()
+ public virtual object Clone()
{
MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
@@ -118,20 +113,6 @@ namespace Apache.NMS.MQTT.Messages
}
}
- public override bool ReadOnlyProperties
- {
- get{ return base.ReadOnlyProperties; }
-
- set
- {
- if(this.propertyHelper != null)
- {
- this.propertyHelper.ReadOnly = value;
- }
- base.ReadOnlyProperties = value;
- }
- }
-
#region Properties
public IPrimitiveMap Properties
@@ -386,26 +367,6 @@ namespace Apache.NMS.MQTT.Messages
{
Properties[name] = value;
}
-
- // MarshallAware interface
- public override bool IsMarshallAware()
- {
- return true;
- }
-
- public override void BeforeMarshall(OpenWireFormat wireFormat)
- {
- MarshalledProperties = null;
- if(properties != null)
- {
- MarshalledProperties = properties.Marshal();
- }
- }
-
- public override Response Visit(ICommandVisitor visitor)
- {
- return visitor.ProcessMessage(this);
- }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs Fri Nov 15 23:43:13 2013
@@ -16,6 +16,7 @@
*/
using System;
+using Apache.NMS.MQTT.Transport;
namespace Apache.NMS.MQTT.Messages
{
@@ -30,10 +31,8 @@ namespace Apache.NMS.MQTT.Messages
*/
public class MessageDispatch : BaseCommand
{
- ConsumerId consumerId;
- ActiveMQDestination destination;
- Message message;
- int redeliveryCounter;
+ Topic destination;
+ MQTTMessage message;
///
/// <summery>
@@ -46,10 +45,8 @@ namespace Apache.NMS.MQTT.Messages
return GetType().Name + "[ " +
"commandId = " + this.CommandId + ", " +
"responseRequired = " + this.ResponseRequired + ", " +
- "ConsumerId = " + ConsumerId + ", " +
"Destination = " + Destination + ", " +
- "Message = " + Message + ", " +
- "RedeliveryCounter = " + RedeliveryCounter + " ]";
+ "Message = " + Message + " ]";
}
public Exception RollbackCause
@@ -58,38 +55,24 @@ namespace Apache.NMS.MQTT.Messages
set { this.rollbackCause = value; }
}
- public ConsumerId ConsumerId
- {
- get { return consumerId; }
- set { this.consumerId = value; }
- }
-
- public ActiveMQDestination Destination
+ public Topic Destination
{
get { return destination; }
set { this.destination = value; }
}
- public Message Message
+ public MQTTMessage Message
{
get { return message; }
set { this.message = value; }
}
- public int RedeliveryCounter
- {
- get { return redeliveryCounter; }
- set { this.redeliveryCounter = value; }
- }
-
public override int GetHashCode()
{
int answer = 0;
- answer = (answer * 37) + HashCode(ConsumerId);
answer = (answer * 37) + HashCode(Destination);
answer = (answer * 37) + HashCode(Message);
- answer = (answer * 37) + HashCode(RedeliveryCounter);
return answer;
}
@@ -106,10 +89,6 @@ namespace Apache.NMS.MQTT.Messages
public virtual bool Equals(MessageDispatch that)
{
- if(!Equals(this.ConsumerId, that.ConsumerId))
- {
- return false;
- }
if(!Equals(this.Destination, that.Destination))
{
return false;
@@ -118,10 +97,6 @@ namespace Apache.NMS.MQTT.Messages
{
return false;
}
- if(!Equals(this.RedeliveryCounter, that.RedeliveryCounter))
- {
- return false;
- }
return true;
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Fri Nov 15 23:43:13 2013
@@ -16,6 +16,7 @@
//
using System;
using System.Collections;
+using Apache.NMS.MQTT.Messages;
namespace Apache.NMS.MQTT
{
@@ -235,7 +236,7 @@ namespace Apache.NMS.MQTT
public ITopic GetTopic(string name)
{
- return null; // TODO
+ return new Topic(name);
}
public ITemporaryQueue CreateTemporaryQueue()
@@ -384,14 +385,14 @@ namespace Apache.NMS.MQTT
}
}
- public void RemoveProducer(ProducerId objectId)
- {
- connection.RemoveProducer(objectId);
- if(!this.closing)
- {
- producers.Remove(objectId);
- }
- }
+// public void RemoveProducer(ProducerId objectId)
+// {
+// connection.RemoveProducer(objectId);
+// if(!this.closing)
+// {
+// producers.Remove(objectId);
+// }
+// }
private MQTTMessage ConfigureMessage(MQTTMessage message)
{
@@ -416,7 +417,7 @@ namespace Apache.NMS.MQTT
/// <param name="message">
/// A <see cref="ActiveMQMessage"/>
/// </param>
- private static void DoNothingAcknowledge(ActiveMQMessage message)
+ private static void DoNothingAcknowledge(MQTTMessage message)
{
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Fri Nov 15 23:43:13 2013
@@ -17,7 +17,7 @@
using System;
using System.Collections;
-using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Messages;
using Apache.NMS.MQTT.Util;
using Apache.NMS.MQTT.Threads;
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs?rev=1542430&r1=1542429&r2=1542430&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs Fri Nov 15 23:43:13 2013
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-namespace Apache.NMS.MQTT
+namespace Apache.NMS.MQTT.Threads
{
/// <summary>
/// Represents a task that may take a few iterations to complete.
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,1074 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// A facility for applications to schedule tasks for future execution in a background
+ /// thread. Tasks may be scheduled for one-time execution, or for repeated execution at
+ /// regular intervals. Unlike the normal .NET Timer this Timer allows for multiple tasks
+ /// to be scheduled in a single Timer object.
+ ///
+ /// Corresponding to each Timer object is a single background thread that is used to execute
+ /// all of the timer's tasks, sequentially. Timer tasks should complete quickly. If a timer
+ /// task takes excessive time to complete, it "hogs" the timer's task execution thread. This
+ /// can, in turn, delay the execution of subsequent tasks, which may "bunch up" and execute
+ /// in rapid succession when (and if) the offending task finally completes.
+ ///
+ /// After the last live reference to a Timer object goes away and all outstanding tasks have
+ /// completed execution, the timer's task execution thread terminates gracefully (and becomes
+ /// subject to garbage collection). However, this can take arbitrarily long to occur. By default,
+ /// the task execution thread does not run as a Background thread, so it is capable of keeping an
+ /// application from terminating. If a caller wants to terminate a timer's task execution thread
+ /// rapidly, the caller should invoke the timer's cancel method.
+ ///
+ /// If the timer's task execution thread terminates unexpectedly, any further attempt to schedule
+ /// a task on the timer will result in an IllegalStateException, as if the timer's cancel method
+ /// had been invoked.
+ ///
+ /// This class is thread-safe: multiple threads can share a single Timer object without the
+ /// need for external synchronization.
+ ///
+ /// This class does not offer real-time guarantees: it schedules tasks using the
+ /// EventWaitHandle.WaitOne(TimeSpan) method.
+ /// </summary>
+ public class TimerEx
+ {
+ #region Static Id For Anonymous Timer Naming.
+
+ private static long timerId;
+
+ private static long NextId()
+ {
+ return Interlocked.Increment(ref timerId);
+ }
+
+ #endregion
+
+ private readonly TimerImpl impl;
+
+ // Used to finalize thread
+ private readonly DisposeHelper disposal;
+
+ public TimerEx(String name, bool isBackground)
+ {
+ if (name == null)
+ {
+ throw new NullReferenceException("name is null");
+ }
+ this.impl = new TimerImpl(name, isBackground);
+ this.disposal = new DisposeHelper(impl);
+ }
+
+ public TimerEx(String name) : this(name, false)
+ {
+ }
+
+ public TimerEx(bool isBackground) : this("Timer-" + TimerEx.NextId().ToString(), isBackground)
+ {
+ }
+
+ public TimerEx() : this(false)
+ {
+ }
+
+ /// <summary>
+ /// Terminates this timer, discarding any currently scheduled tasks. Does not interfere
+ /// with a currently executing task (if it exists). Once a timer has been terminated,
+ /// its execution thread terminates gracefully, and no more tasks may be scheduled on it.
+ ///
+ /// Note that calling this method from within the run method of a timer task that was
+ /// invoked by this timer absolutely guarantees that the ongoing task execution is the
+ /// last task execution that will ever be performed by this timer.
+ ///
+ /// This method may be called repeatedly; the second and subsequent calls have no effect.
+ /// </summary>
+ public void Cancel()
+ {
+ this.impl.Cancel();
+ }
+
+ /// <summary>
+ /// Removes all cancelled tasks from this timer's task queue. Calling this method has
+ /// no effect on the behavior of the timer, but eliminates the references to the cancelled
+ /// tasks from the queue. If there are no external references to these tasks, they become
+ /// eligible for garbage collection.
+ ///
+ /// Most programs will have no need to call this method. It is designed for use by the
+ /// rare application that cancels a large number of tasks. Calling this method trades
+ /// time for space: the runtime of the method may be proportional to n + c log n, where
+ /// n is the number of tasks in the queue and c is the number of cancelled tasks.
+ ///
+ /// Note that it is permissible to call this method from within a a task scheduled
+ /// on this timer.
+ /// </summary>
+ public int Purge()
+ {
+ lock(this.impl.SyncRoot)
+ {
+ return impl.Purge();
+ }
+ }
+
+ public override string ToString()
+ {
+ return string.Format("[TimerEx{0}]", this.impl.Name);
+ }
+
+ #region WaitCallback Scheduling Methods
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for execution at the specified time. If the
+ /// time is in the past, the task is scheduled for immediate execution. The method returns
+ /// a TimerTask instance that can be used to later cancel the scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, DateTime when)
+ {
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for execution after the specified delay.
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, int delay)
+ {
+ if(delay < 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(-1), false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for execution after the specified delay.
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, TimeSpan delay)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-delay execution,
+ /// beginning after the specified delay. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, int delay, int period)
+ {
+ if(delay < 0 || period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-delay execution,
+ /// beginning after the specified delay. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, TimeSpan delay, TimeSpan period)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, delay, period, false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-delay execution,
+ /// beginning at the specified start time. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, DateTime when, int period)
+ {
+ if (period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-delay execution,
+ /// beginning at the specified start time. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask Schedule(WaitCallback callback, object arg, DateTime when, TimeSpan period)
+ {
+ if (period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, period, false);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, int delay, int period)
+ {
+ if(delay < 0 || period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), true);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, TimeSpan delay, TimeSpan period)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ DoScheduleImpl(task, delay, period, true);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, DateTime when, int period)
+ {
+ if (period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), true);
+ return task;
+ }
+
+ /// <summary>
+ /// Schedules the specified WaitCallback task for repeated fixed-rate execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ ///
+ /// The method returns a TimerTask instance that can be used to later cancel the
+ /// scheduled task.
+ /// </summary>
+ public TimerTask ScheduleAtFixedRate(WaitCallback callback, object arg, DateTime when, TimeSpan period)
+ {
+ if (period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ InternalTimerTask task = new InternalTimerTask(callback, arg);
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, period, true);
+ return task;
+ }
+
+ #endregion
+
+ #region TimerTask Scheduling Methods
+
+ /// <summary>
+ /// Schedules the specified TimerTask for execution at the specified time. If the
+ /// time is in the past.
+ /// </summary>
+ public void Schedule(TimerTask task, DateTime when)
+ {
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for execution after the specified delay.
+ /// </summary>
+ public void Schedule(TimerTask task, int delay)
+ {
+ if(delay < 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(-1), false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for execution after the specified delay.
+ /// </summary>
+ public void Schedule(TimerTask task, TimeSpan delay)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(-1), false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-delay execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ /// </summary>
+ public void Schedule(TimerTask task, int delay, int period)
+ {
+ if(delay < 0 || period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-delay execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ /// </summary>
+ public void Schedule(TimerTask task, TimeSpan delay, TimeSpan period)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, delay, period, false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-delay execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ /// </summary>
+ public void Schedule(TimerTask task, DateTime when, int period)
+ {
+ if (period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-delay execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately
+ /// regular intervals separated by the specified period.
+ ///
+ /// In fixed-delay execution, each execution is scheduled relative to the actual execution
+ /// time of the previous execution. If an execution is delayed for any reason (such as
+ /// garbage collection or other background activity), subsequent executions will be delayed.
+ ///
+ /// Fixed-delay execution is appropriate for recurring activities that require "smoothness."
+ /// In other words, it is appropriate for activities where it is more important to keep the
+ /// frequency accurate in the short run than in the long run.
+ /// </summary>
+ public void Schedule(TimerTask task, DateTime when, TimeSpan period)
+ {
+ if (period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, period, false);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-rate execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ /// </summary>
+ public void ScheduleAtFixedRate(TimerTask task, int delay, int period)
+ {
+ if(delay < 0 || period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, TimeSpan.FromMilliseconds(delay), TimeSpan.FromMilliseconds(period), true);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-rate execution, beginning
+ /// after the specified delay. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ /// </summary>
+ public void ScheduleAtFixedRate(TimerTask task, TimeSpan delay, TimeSpan period)
+ {
+ if(delay.CompareTo(TimeSpan.Zero) < 0 || period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ DoScheduleImpl(task, delay, period, true);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-rate execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ /// </summary>
+ public void ScheduleAtFixedRate(TimerTask task, DateTime when, int period)
+ {
+ if (period <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, TimeSpan.FromMilliseconds(period), true);
+ }
+
+ /// <summary>
+ /// Schedules the specified TimerTask for repeated fixed-rate execution, beginning
+ /// at the specified time. Subsequent executions take place at approximately regular
+ /// intervals, separated by the specified period.
+ ///
+ /// In fixed-rate execution, each execution is scheduled relative to the scheduled execution
+ /// time of the initial execution. If an execution is delayed for any reason (such as garbage
+ /// collection or other background activity), two or more executions will occur in rapid
+ /// succession to "catch up."
+ ///
+ /// Fixed-rate execution is appropriate for recurring activities that are sensitive to
+ /// absolute time, such as ringing a chime every hour on the hour, or running scheduled
+ /// maintenance every day at a particular time.
+ /// </summary>
+ public void ScheduleAtFixedRate(TimerTask task, DateTime when, TimeSpan period)
+ {
+ if (period.CompareTo(TimeSpan.Zero) <= 0)
+ {
+ throw new ArgumentOutOfRangeException();
+ }
+
+ TimeSpan delay = when - DateTime.Now;
+ DoScheduleImpl(task, delay, period, true);
+ }
+
+ #endregion
+
+ #region Implementation of Scheduling method.
+
+ private void DoScheduleImpl(TimerTask task, TimeSpan delay, TimeSpan period, bool fixedRate)
+ {
+ if (task == null)
+ {
+ throw new ArgumentNullException("TimerTask cannot be null");
+ }
+
+ lock(this.impl.SyncRoot)
+ {
+ if (impl.Cancelled)
+ {
+ throw new InvalidOperationException();
+ }
+
+ DateTime when = DateTime.Now + delay;
+
+ lock(task.syncRoot)
+ {
+ if (task.IsScheduled)
+ {
+ throw new InvalidOperationException();
+ }
+
+ if (task.cancelled)
+ {
+ throw new InvalidOperationException("Task is already cancelled");
+ }
+
+ task.when = when;
+ task.period = period;
+ task.fixedRate = fixedRate;
+ }
+
+ // insert the newTask into queue
+ impl.InsertTask(task);
+ }
+ }
+
+ #endregion
+
+ #region Interal TimerTask to invoking WaitCallback tasks
+
+ private class InternalTimerTask : TimerTask
+ {
+ private WaitCallback task;
+ private object taskArg;
+
+ public InternalTimerTask(WaitCallback task, object taskArg)
+ {
+ if (task == null)
+ {
+ throw new ArgumentNullException("The WaitCallback task cannot be null");
+ }
+
+ this.task = task;
+ this.taskArg = taskArg;
+ }
+
+ public override void Run()
+ {
+ this.task(taskArg);
+ }
+ }
+
+ #endregion
+
+ #region Timer Heap that sorts Tasks into timed order.
+
+ private sealed class TimerHeap
+ {
+ internal static readonly int DEFAULT_HEAP_SIZE = 256;
+
+ internal TimerTask[] timers = new TimerTask[DEFAULT_HEAP_SIZE];
+ internal int size = 0;
+ internal int deletedCancelledNumber = 0;
+
+ public TimerTask Minimum()
+ {
+ return timers[0];
+ }
+
+ public bool IsEmpty()
+ {
+ return size == 0;
+ }
+
+ public void Insert(TimerTask task)
+ {
+ if (timers.Length == size)
+ {
+ TimerTask[] appendedTimers = new TimerTask[size * 2];
+ timers.CopyTo(appendedTimers, 0);
+ timers = appendedTimers;
+ }
+ timers[size++] = task;
+ UpHeap();
+ }
+
+ public void Delete(int pos)
+ {
+ // posible to delete any position of the heap
+ if (pos >= 0 && pos < size)
+ {
+ timers[pos] = timers[--size];
+ timers[size] = null;
+ DownHeap(pos);
+ }
+ }
+
+ private void UpHeap()
+ {
+ int current = size - 1;
+ int parent = (current - 1) / 2;
+
+ while (timers[current].when < timers[parent].when)
+ {
+ // swap the two
+ TimerTask tmp = timers[current];
+ timers[current] = timers[parent];
+ timers[parent] = tmp;
+
+ // update pos and current
+ current = parent;
+ parent = (current - 1) / 2;
+ }
+ }
+
+ private void DownHeap(int pos)
+ {
+ int current = pos;
+ int child = 2 * current + 1;
+
+ while (child < size && size > 0)
+ {
+ // compare the children if they exist
+ if (child + 1 < size && timers[child + 1].when < timers[child].when)
+ {
+ child++;
+ }
+
+ // compare selected child with parent
+ if (timers[current].when < timers[child].when)
+ {
+ break;
+ }
+
+ // swap the two
+ TimerTask tmp = timers[current];
+ timers[current] = timers[child];
+ timers[child] = tmp;
+
+ // update pos and current
+ current = child;
+ child = 2 * current + 1;
+ }
+ }
+
+ public void Reset()
+ {
+ timers = new TimerTask[DEFAULT_HEAP_SIZE];
+ size = 0;
+ }
+
+ public void AdjustMinimum()
+ {
+ DownHeap(0);
+ }
+
+ public void DeleteIfCancelled()
+ {
+ for (int i = 0; i < size; i++)
+ {
+ if (timers[i].cancelled)
+ {
+ deletedCancelledNumber++;
+ Delete(i);
+ // re-try this point
+ i--;
+ }
+ }
+ }
+
+ internal int GetTask(TimerTask task)
+ {
+ for (int i = 0; i < timers.Length; i++)
+ {
+ if (timers[i] == task)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+ }
+
+ #endregion
+
+ #region TimerEx Task Runner Implementation
+
+ private sealed class TimerImpl
+ {
+ private bool cancelled;
+ private bool finished;
+ private String name;
+ private TimerHeap tasks = new TimerHeap();
+ private System.Threading.Thread runner;
+ private object syncRoot = new object();
+
+ public TimerImpl(String name, bool isBackground)
+ {
+ this.name = name;
+ this.runner = new Thread(new ThreadStart(this.Run));
+ this.runner.Name = name;
+ this.runner.IsBackground = isBackground;
+ this.runner.Start();
+ }
+
+ public String Name
+ {
+ get { return this.name; }
+ }
+
+ public object SyncRoot
+ {
+ get { return this.syncRoot; }
+ }
+
+ public bool Cancelled
+ {
+ get { return this.cancelled; }
+ }
+
+ public bool Finished
+ {
+ set { this.finished = value; }
+ }
+
+ /// <summary>
+ /// Run this Timers event loop in its own Thread.
+ /// </summary>
+ public void Run()
+ {
+ while (true)
+ {
+ TimerTask task;
+ lock (this.syncRoot)
+ {
+ // need to check cancelled inside the synchronized block
+ if (cancelled)
+ {
+ return;
+ }
+
+ if (tasks.IsEmpty())
+ {
+ if (finished)
+ {
+ return;
+ }
+
+ // no tasks scheduled -- sleep until any task appear
+ try
+ {
+ Monitor.Wait(this.syncRoot);
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ continue;
+ }
+
+ DateTime currentTime = DateTime.Now;
+
+ task = tasks.Minimum();
+ TimeSpan timeToSleep;
+
+ lock (task.syncRoot)
+ {
+ if (task.cancelled)
+ {
+ tasks.Delete(0);
+ continue;
+ }
+
+ // check the time to sleep for the first task scheduled
+ timeToSleep = task.when - currentTime;
+ }
+
+ if (timeToSleep.CompareTo(TimeSpan.Zero) > 0)
+ {
+ // sleep!
+ try
+ {
+ Monitor.Wait(this.syncRoot, timeToSleep);
+ }
+ catch (ThreadInterruptedException)
+ {
+ }
+ continue;
+ }
+
+ // no sleep is necessary before launching the task
+ lock (task.syncRoot)
+ {
+ int pos = 0;
+ if (tasks.Minimum().when != task.when)
+ {
+ pos = tasks.GetTask(task);
+ }
+ if (task.cancelled)
+ {
+ tasks.Delete(tasks.GetTask(task));
+ continue;
+ }
+
+ // set time to schedule
+ task.ScheduledTime = task.when;
+
+ // remove task from queue
+ tasks.Delete(pos);
+
+ // set when the next task should be launched
+ if (task.period.CompareTo(TimeSpan.Zero) >= 0)
+ {
+ // this is a repeating task,
+ if (task.fixedRate)
+ {
+ // task is scheduled at fixed rate
+ task.when = task.when + task.period;
+ }
+ else
+ {
+ // task is scheduled at fixed delay
+ task.when = DateTime.Now + task.period;
+ }
+
+ // insert this task into queue
+ InsertTask(task);
+ }
+ else
+ {
+ task.when = DateTime.MinValue;
+ }
+ }
+ }
+
+ bool taskCompletedNormally = false;
+ try
+ {
+ task.Run();
+ taskCompletedNormally = true;
+ }
+ finally
+ {
+ if (!taskCompletedNormally)
+ {
+ lock (this)
+ {
+ cancelled = true;
+ }
+ }
+ }
+ }
+ }
+
+ public void InsertTask(TimerTask newTask)
+ {
+ // callers are synchronized
+ tasks.Insert(newTask);
+ Monitor.Pulse(this.syncRoot);
+ }
+
+ public void Cancel()
+ {
+ lock(this.syncRoot)
+ {
+ cancelled = true;
+ tasks.Reset();
+ Monitor.Pulse(this.syncRoot);
+ }
+ }
+
+ public int Purge()
+ {
+ if (tasks.IsEmpty())
+ {
+ return 0;
+ }
+
+ // callers are synchronized
+ tasks.deletedCancelledNumber = 0;
+ tasks.DeleteIfCancelled();
+ return tasks.deletedCancelledNumber;
+ }
+ }
+
+ #endregion
+
+ #region Helper class to handle Timer shutdown when Disposed
+
+ private sealed class DisposeHelper : IDisposable
+ {
+ private readonly TimerImpl impl;
+
+ public DisposeHelper(TimerImpl impl)
+ {
+ this.impl = impl;
+ }
+
+ public void Dispose()
+ {
+ lock(impl.SyncRoot)
+ {
+ impl.Finished = true;
+ Monitor.PulseAll(impl.SyncRoot);
+ }
+ }
+ }
+
+ #endregion
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerEx.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs?rev=1542430&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs Fri Nov 15 23:43:13 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// A Task that is run in a Timer instance either once or repeatedly.
+ /// </summary>
+ public abstract class TimerTask
+ {
+ internal object syncRoot = new object();
+
+ internal DateTime when = DateTime.MinValue;
+ internal DateTime scheduledTime = DateTime.MinValue;
+ internal TimeSpan period;
+ internal bool cancelled;
+ internal bool fixedRate;
+
+ protected TimerTask()
+ {
+ }
+
+ public bool Cancel()
+ {
+ lock(this.syncRoot)
+ {
+ bool willRun = !cancelled && when != DateTime.MinValue;
+ cancelled = true;
+ return willRun;
+ }
+ }
+
+ public DateTime ScheduledExecutionTime
+ {
+ get
+ {
+ lock(this.syncRoot)
+ {
+ return this.scheduledTime;
+ }
+ }
+ }
+
+ public abstract void Run();
+
+ #region Timer Methods
+
+ internal DateTime When
+ {
+ get
+ {
+ lock(this.syncRoot)
+ {
+ return this.when;
+ }
+ }
+ }
+
+ internal DateTime ScheduledTime
+ {
+ set
+ {
+ lock(this.syncRoot)
+ {
+ this.scheduledTime = value;
+ }
+ }
+ }
+
+ internal bool IsScheduled
+ {
+ get
+ {
+ lock(this.syncRoot)
+ {
+ return when != DateTime.MinValue || scheduledTime != DateTime.MinValue;
+ }
+ }
+ }
+
+ #endregion
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TimerTask.cs
------------------------------------------------------------------------------
svn:eol-style = native