You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/15 00:39:24 UTC
svn commit: r1542120 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk:
./ src/main/csharp/ src/main/csharp/Messages/ src/main/csharp/Threads/
src/main/csharp/Transport/
Author: tabish
Date: Thu Nov 14 23:39:24 2013
New Revision: 1542120
URL: http://svn.apache.org/r1542120
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Starting to implement the framework
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs?rev=1542120&r1=1542119&r2=1542120&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs Thu Nov 14 23:39:24 2013
@@ -17,26 +17,22 @@
using System;
-namespace Apache.NMS.ActiveMQ
+namespace Apache.NMS.MQTT
{
-
/// <summary>
/// Exception thrown when an IO error occurs
/// </summary>
public class IOException : NMSException
{
- public IOException()
- : base("IO Exception failed with missing exception log")
+ public IOException() : base("IO Exception failed with missing exception log")
{
}
- public IOException(String msg)
- : base(msg)
+ public IOException(String msg) : base(msg)
{
}
- public IOException(String msg, Exception inner)
- : base(msg, inner)
+ public IOException(String msg, Exception inner) : base(msg, inner)
{
}
}
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT.Commands
+{
+ public class BytesMessage : MQTTMessage, IBytesMessage
+ {
+ private EndianBinaryReader dataIn = null;
+ private EndianBinaryWriter dataOut = null;
+ private MemoryStream outputBuffer = null;
+ private int length = 0;
+
+ public override Object Clone()
+ {
+ StoreContent();
+ return base.Clone();
+ }
+
+ public override void OnSend()
+ {
+ base.OnSend();
+ StoreContent();
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.outputBuffer = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.length = 0;
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ InitializeReading();
+ return this.length;
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadByte();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteByte( byte value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public bool ReadBoolean()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadBoolean();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean( bool value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadChar();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteChar( char value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt16( short value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt32();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt32( int value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt64();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt64( long value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadSingle();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteSingle( float value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadDouble();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteDouble( double value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, value.Length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value, int length )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, 0, value.Length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value, int offset, int length )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, offset, length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReading();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ return dataIn.ReadString16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteString( string value )
+ {
+ InitializeWriting();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ dataOut.WriteString16(value);
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject( System.Object value )
+ {
+ InitializeWriting();
+ if( value is System.Byte )
+ {
+ this.dataOut.Write( (byte) value );
+ }
+ else if( value is Char )
+ {
+ this.dataOut.Write( (char) value );
+ }
+ else if( value is Boolean )
+ {
+ this.dataOut.Write( (bool) value );
+ }
+ else if( value is Int16 )
+ {
+ this.dataOut.Write( (short) value );
+ }
+ else if( value is Int32 )
+ {
+ this.dataOut.Write( (int) value );
+ }
+ else if( value is Int64 )
+ {
+ this.dataOut.Write( (long) value );
+ }
+ else if( value is Single )
+ {
+ this.dataOut.Write( (float) value );
+ }
+ else if( value is Double )
+ {
+ this.dataOut.Write( (double) value );
+ }
+ else if( value is byte[] )
+ {
+ this.dataOut.Write( (byte[]) value );
+ }
+ else if( value is String )
+ {
+ this.dataOut.WriteString16( (string) value );
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+ }
+ }
+
+ public new byte[] Content
+ {
+ get
+ {
+ byte[] buffer = null;
+ InitializeReading();
+ if(this.length != 0)
+ {
+ buffer = new byte[this.length];
+ this.dataIn.Read(buffer, 0, buffer.Length);
+ }
+ return buffer;
+ }
+
+ set
+ {
+ InitializeWriting();
+ if(null != value)
+ {
+ this.dataOut.Write(value, 0, value.Length);
+ }
+ }
+ }
+
+ public void Reset()
+ {
+ StoreContent();
+ this.dataIn = null;
+ this.dataOut = null;
+ this.outputBuffer = null;
+ this.ReadOnlyBody = true;
+ }
+
+ private void InitializeReading()
+ {
+ FailIfWriteOnlyBody();
+ if(this.dataIn == null)
+ {
+ byte[] data = base.Content;
+
+ if(base.Content == null)
+ {
+ data = new byte[0];
+ }
+
+ Stream target = new MemoryStream(data, false);
+
+ if(this.Connection != null && this.Compressed == true)
+ {
+ EndianBinaryReader reader = new EndianBinaryReader(target);
+ this.length = reader.ReadInt32();
+
+ target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
+ }
+ else
+ {
+ this.length = data.Length;
+ }
+
+ this.dataIn = new EndianBinaryReader(target);
+ }
+ }
+
+ private void InitializeWriting()
+ {
+ FailIfReadOnlyBody();
+ if(this.dataOut == null)
+ {
+ this.outputBuffer = new MemoryStream();
+ Stream target = this.outputBuffer;
+
+ if(this.Connection != null && this.Connection.UseCompression)
+ {
+ this.length = 0;
+ this.Compressed = true;
+
+ target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
+ target = new LengthTrackerStream(target, this);
+ }
+
+ this.dataOut = new EndianBinaryWriter(target);
+ }
+ }
+
+ private void StoreContent()
+ {
+ if(this.dataOut != null)
+ {
+ if(this.Compressed == true)
+ {
+ MemoryStream final = new MemoryStream();
+ EndianBinaryWriter writer = new EndianBinaryWriter(final);
+
+ this.dataOut.Close();
+
+ writer.Write(this.length);
+ if(this.length > 0)
+ {
+ byte[] compressed = this.outputBuffer.ToArray();
+ writer.Write(compressed, 0, compressed.Length);
+ }
+ writer.Close();
+
+ base.Content = final.ToArray();
+ }
+ else
+ {
+ this.dataOut.Close();
+ base.Content = outputBuffer.ToArray();
+ }
+
+ this.dataOut = null;
+ this.outputBuffer = null;
+ }
+ }
+
+ /// <summary>
+ /// Used when the message compression is enabled to track how many bytes
+ /// the EndianBinaryWriter actually writes to the stream before compression
+ /// so that the receiving client can read off the real bodylength from the
+ /// Message before the data is actually read.
+ /// </summary>
+ private class LengthTrackerStream : Stream
+ {
+ private readonly ActiveMQBytesMessage parent;
+ private readonly Stream sink;
+
+ public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+ {
+ this.sink = sink;
+ this.parent = parent;
+ }
+
+ public override void Close()
+ {
+ this.sink.Close();
+ base.Close();
+ }
+
+ public override long Position
+ {
+ get { return this.sink.Position; }
+ set { this.sink.Position = value; }
+ }
+
+ public override long Length
+ {
+ get { return this.sink.Length; }
+ }
+
+ public override bool CanSeek
+ {
+ get { return this.sink.CanSeek; }
+ }
+
+ public override bool CanRead
+ {
+ get { return this.sink.CanRead; }
+ }
+
+ public override bool CanWrite
+ {
+ get { return this.sink.CanWrite; }
+ }
+
+ public override int ReadByte()
+ {
+ return this.sink.ReadByte();
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return this.sink.Read(buffer, offset, count);
+ }
+
+ public override void WriteByte(byte value)
+ {
+ this.parent.length++;
+ this.sink.WriteByte(value);
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ if(null != buffer)
+ {
+ this.parent.length += count;
+ this.sink.Write(buffer, offset, count);
+ }
+ }
+
+ public override void Flush()
+ {
+ this.sink.Flush();
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ return this.sink.Seek(offset, origin);
+ }
+
+ public override void SetLength(long value)
+ {
+ this.sink.SetLength(value);
+ }
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/BytesMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Messages
+{
+ public delegate void AcknowledgeHandler(MQTTMessage message);
+
+ public class MQTTMessage : IMessage
+ {
+ private readonly PUBLISH publish = new PUBLISH();
+ private MessagePropertyIntercepter propertyHelper;
+ private PrimitiveMap properties;
+ private Connection connection;
+
+ public event AcknowledgeHandler Acknowledger;
+
+ public static MQTTMessage Transform(IMessage message)
+ {
+ return (MQTTMessage) message;
+ }
+
+ public MQTTMessage() : base()
+ {
+ Timestamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
+ }
+
+ public override int GetHashCode()
+ {
+ MessageId id = this.MessageId;
+
+ return id != null ? id.GetHashCode() : base.GetHashCode();
+ }
+
+ public override byte GetDataStructureType()
+ {
+ return ID_ACTIVEMQMESSAGE;
+ }
+
+ public override object Clone()
+ {
+ MQTTMessage cloneMessage = (MQTTMessage) base.Clone();
+
+ cloneMessage.propertyHelper = new MessagePropertyIntercepter(cloneMessage, cloneMessage.properties, this.ReadOnlyProperties) { AllowByteArrays = false };
+ return cloneMessage;
+ }
+
+ public override bool Equals(object that)
+ {
+ if(that is MQTTMessage)
+ {
+ return Equals((MQTTMessage) that);
+ }
+ return false;
+ }
+
+ public virtual bool Equals(MQTTMessage that)
+ {
+ MessageId oMsg = that.MessageId;
+ MessageId thisMsg = this.MessageId;
+
+ return thisMsg != null && oMsg != null && oMsg.Equals(thisMsg);
+ }
+
+ public void Acknowledge()
+ {
+ if(null == Acknowledger)
+ {
+ throw new NMSException("No Acknowledger has been associated with this message: " + this);
+ }
+
+ Acknowledger(this);
+ }
+
+ public virtual void ClearBody()
+ {
+ this.ReadOnlyBody = false;
+ this.Content = null;
+ }
+
+ public virtual void ClearProperties()
+ {
+ this.MarshalledProperties = null;
+ this.ReadOnlyProperties = false;
+ this.Properties.Clear();
+ }
+
+ protected void FailIfReadOnlyBody()
+ {
+ if(ReadOnlyBody == true)
+ {
+ throw new MessageNotWriteableException("Message is in Read-Only mode.");
+ }
+ }
+
+ protected void FailIfWriteOnlyBody()
+ {
+ if( ReadOnlyBody == false )
+ {
+ throw new MessageNotReadableException("Message is in Write-Only mode.");
+ }
+ }
+
+ public override bool ReadOnlyProperties
+ {
+ get{ return base.ReadOnlyProperties; }
+
+ set
+ {
+ if(this.propertyHelper != null)
+ {
+ this.propertyHelper.ReadOnly = value;
+ }
+ base.ReadOnlyProperties = value;
+ }
+ }
+
+ #region Properties
+
+ public IPrimitiveMap Properties
+ {
+ get
+ {
+ if(null == properties)
+ {
+ properties = PrimitiveMap.Unmarshal(MarshalledProperties);
+ propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties)
+ {AllowByteArrays = false};
+
+ // Since JMS doesn't define a Byte array interface for properties we
+ // disable them here to prevent sending invalid data to the broker.
+ }
+
+ return propertyHelper;
+ }
+ }
+
+ public IDestination FromDestination
+ {
+ get { return Destination; }
+ set { this.Destination = ActiveMQDestination.Transform(value); }
+ }
+
+ public Connection Connection
+ {
+ get { return this.connection; }
+ set { this.connection = value; }
+ }
+
+ /// <summary>
+ /// The correlation ID used to correlate messages with conversations or long running business processes
+ /// </summary>
+ public string NMSCorrelationID
+ {
+ get { return CorrelationId; }
+ set { CorrelationId = value; }
+ }
+
+ /// <summary>
+ /// The destination of the message
+ /// </summary>
+ public IDestination NMSDestination
+ {
+ get { return Destination; }
+ set { Destination = value as ActiveMQDestination; }
+ }
+
+ private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
+ /// <summary>
+ /// The time in milliseconds that this message should expire in
+ /// </summary>
+ public TimeSpan NMSTimeToLive
+ {
+ get
+ {
+ if(Expiration > 0 && timeToLive.TotalMilliseconds <= 0.0)
+ {
+ timeToLive = TimeSpan.FromMilliseconds(Expiration - Timestamp);
+ }
+
+ return timeToLive;
+ }
+
+ set
+ {
+ timeToLive = value;
+ if(timeToLive.TotalMilliseconds > 0)
+ {
+ Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
+ }
+ else
+ {
+ Expiration = 0;
+ }
+ }
+ }
+
+ /// <summary>
+ /// The message ID which is set by the provider
+ /// </summary>
+ public string NMSMessageId
+ {
+ get
+ {
+ return null != MessageId ? BaseDataStreamMarshaller.ToString(MessageId) : String.Empty;
+ }
+
+ set
+ {
+ if(value != null)
+ {
+ try
+ {
+ MessageId id = new MessageId(value);
+ this.MessageId = id;
+ }
+ catch(FormatException)
+ {
+ // we must be some foreign JMS provider or strange user-supplied
+ // String so lets set the IDs to be 1
+ MessageId id = new MessageId();
+ this.MessageId = id;
+ }
+ }
+ else
+ {
+ this.MessageId = null;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Whether or not this message is persistent
+ /// </summary>
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get { return (Persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
+ set { Persistent = (MsgDeliveryMode.Persistent == value); }
+ }
+
+ /// <summary>
+ /// The Priority on this message
+ /// </summary>
+ public MsgPriority NMSPriority
+ {
+ get { return (MsgPriority) Priority; }
+ set { Priority = (byte) value; }
+ }
+
+ /// <summary>
+ /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
+ /// </summary>
+ public bool NMSRedelivered
+ {
+ get { return (RedeliveryCounter > 0); }
+
+ set
+ {
+ if(value == true)
+ {
+ if(this.RedeliveryCounter <= 0)
+ {
+ this.RedeliveryCounter = 1;
+ }
+ }
+ else
+ {
+ if(this.RedeliveryCounter > 0)
+ {
+ this.RedeliveryCounter = 0;
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// The destination that the consumer of this message should send replies to
+ /// </summary>
+ public IDestination NMSReplyTo
+ {
+ get { return ReplyTo; }
+ set { ReplyTo = ActiveMQDestination.Transform(value); }
+ }
+
+ /// <summary>
+ /// The timestamp the broker added to the message
+ /// </summary>
+ public DateTime NMSTimestamp
+ {
+ get { return DateUtils.ToDateTime(Timestamp); }
+ set
+ {
+ Timestamp = DateUtils.ToJavaTimeUtc(value);
+ if(timeToLive.TotalMilliseconds > 0)
+ {
+ Expiration = Timestamp + (long) timeToLive.TotalMilliseconds;
+ }
+ }
+ }
+
+ /// <summary>
+ /// The type name of this message
+ /// </summary>
+ public string NMSType
+ {
+ get { return Type; }
+ set { Type = value; }
+ }
+
+ #endregion
+
+ #region NMS Extension headers
+
+ /// <summary>
+ /// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully.
+ /// </summary>
+ public int NMSXDeliveryCount
+ {
+ get { return RedeliveryCounter + 1; }
+ }
+
+ /// <summary>
+ /// The Message Group ID used to group messages together to the same consumer for the same group ID value
+ /// </summary>
+ public string NMSXGroupID
+ {
+ get { return GroupID; }
+ set { GroupID = value; }
+ }
+ /// <summary>
+ /// The Message Group Sequence counter to indicate the position in a group
+ /// </summary>
+ public int NMSXGroupSeq
+ {
+ get { return GroupSequence; }
+ set { GroupSequence = value; }
+ }
+
+ /// <summary>
+ /// Returns the ID of the producers transaction
+ /// </summary>
+ public string NMSXProducerTXID
+ {
+ get
+ {
+ TransactionId txnId = OriginalTransactionId;
+ if(null == txnId)
+ {
+ txnId = TransactionId;
+ }
+
+ if(null != txnId)
+ {
+ return BaseDataStreamMarshaller.ToString(txnId);
+ }
+
+ return String.Empty;
+ }
+ }
+
+ #endregion
+
+ public object GetObjectProperty(string name)
+ {
+ return Properties[name];
+ }
+
+ public void SetObjectProperty(string name, object value)
+ {
+ Properties[name] = value;
+ }
+
+ // MarshallAware interface
+ public override bool IsMarshallAware()
+ {
+ return true;
+ }
+
+ public override void BeforeMarshall(OpenWireFormat wireFormat)
+ {
+ MarshalledProperties = null;
+ if(properties != null)
+ {
+ MarshalledProperties = properties.Marshal();
+ }
+ }
+
+ public override Response Visit(ICommandVisitor visitor)
+ {
+ return visitor.ProcessMessage(this);
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Messages
+{
+ /*
+ *
+ * Command code for OpenWire format for MessageDispatch
+ *
+ * NOTE!: This file is auto generated - do not modify!
+ * if you need to make a change, please see the Java Classes
+ * in the nms-activemq-openwire-generator module
+ *
+ */
+ public class MessageDispatch : BaseCommand
+ {
+ ConsumerId consumerId;
+ ActiveMQDestination destination;
+ Message message;
+ int redeliveryCounter;
+
+ ///
+ /// <summery>
+ /// Returns a string containing the information for this DataStructure
+ /// such as its type and value of its elements.
+ /// </summery>
+ ///
+ public override string ToString()
+ {
+ return GetType().Name + "[ " +
+ "commandId = " + this.CommandId + ", " +
+ "responseRequired = " + this.ResponseRequired + ", " +
+ "ConsumerId = " + ConsumerId + ", " +
+ "Destination = " + Destination + ", " +
+ "Message = " + Message + ", " +
+ "RedeliveryCounter = " + RedeliveryCounter + " ]";
+ }
+
+ public Exception RollbackCause
+ {
+ get { return this.rollbackCause; }
+ set { this.rollbackCause = value; }
+ }
+
+ public ConsumerId ConsumerId
+ {
+ get { return consumerId; }
+ set { this.consumerId = value; }
+ }
+
+ public ActiveMQDestination Destination
+ {
+ get { return destination; }
+ set { this.destination = value; }
+ }
+
+ public Message Message
+ {
+ get { return message; }
+ set { this.message = value; }
+ }
+
+ public int RedeliveryCounter
+ {
+ get { return redeliveryCounter; }
+ set { this.redeliveryCounter = value; }
+ }
+
+ public override int GetHashCode()
+ {
+ int answer = 0;
+
+ answer = (answer * 37) + HashCode(ConsumerId);
+ answer = (answer * 37) + HashCode(Destination);
+ answer = (answer * 37) + HashCode(Message);
+ answer = (answer * 37) + HashCode(RedeliveryCounter);
+
+ return answer;
+ }
+
+ public override bool Equals(object that)
+ {
+ if(that is MessageDispatch)
+ {
+ return Equals((MessageDispatch) that);
+ }
+
+ return false;
+ }
+
+ public virtual bool Equals(MessageDispatch that)
+ {
+ if(!Equals(this.ConsumerId, that.ConsumerId))
+ {
+ return false;
+ }
+ if(!Equals(this.Destination, that.Destination))
+ {
+ return false;
+ }
+ if(!Equals(this.Message, that.Message))
+ {
+ return false;
+ }
+ if(!Equals(this.RedeliveryCounter, that.RedeliveryCounter))
+ {
+ return false;
+ }
+
+ return true;
+ }
+ ///
+ /// <summery>
+ /// Return an answer of true to the isMessageDispatch() query.
+ /// </summery>
+ ///
+ public override bool IsMessageDispatch
+ {
+ get { return true; }
+ }
+ };
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MessageDispatch.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// A TaskRunner that dedicates a single thread to running a single Task.
+ /// </summary>
+ public class DedicatedTaskRunner : TaskRunner
+ {
+ private readonly Mutex mutex = new Mutex();
+ private readonly Thread theThread = null;
+ private readonly Task task = null;
+
+ private bool terminated = false;
+ private bool pending = false;
+ private bool shutdown = false;
+
+ public DedicatedTaskRunner(Task task)
+ : this(task, "MQTT Task", ThreadPriority.Normal)
+ {
+ }
+
+ public DedicatedTaskRunner(Task task, string taskName, ThreadPriority taskPriority)
+ {
+ if(task == null)
+ {
+ throw new NullReferenceException("Task was null");
+ }
+
+ this.task = task;
+
+ this.theThread = new Thread(Run);
+ this.theThread.IsBackground = true;
+ this.theThread.Priority = taskPriority;
+ this.theThread.Name = taskName;
+ this.theThread.Start();
+ }
+
+ public void Shutdown(TimeSpan timeout)
+ {
+ lock(mutex)
+ {
+ this.shutdown = true;
+ this.pending = true;
+
+ Monitor.PulseAll(this.mutex);
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if(Thread.CurrentThread != this.theThread && !this.terminated)
+ {
+ Monitor.Wait(this.mutex, timeout);
+ }
+ }
+ }
+
+ public void Shutdown()
+ {
+ this.Shutdown(TimeSpan.FromMilliseconds(-1));
+ }
+
+ public void ShutdownWithAbort(TimeSpan timeout)
+ {
+ lock(mutex)
+ {
+ this.shutdown = true;
+ this.pending = true;
+
+ Monitor.PulseAll(this.mutex);
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if(Thread.CurrentThread != this.theThread && !this.terminated)
+ {
+ Monitor.Wait(this.mutex, timeout);
+
+ if(!this.terminated)
+ {
+ theThread.Abort();
+ }
+ }
+ }
+ }
+
+ public void Wakeup()
+ {
+ lock(mutex)
+ {
+ if(this.shutdown)
+ {
+ return;
+ }
+
+ this.pending = true;
+
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ internal void Run()
+ {
+ try
+ {
+ while(true)
+ {
+ lock(this.mutex)
+ {
+ pending = false;
+ if(this.shutdown)
+ {
+ return;
+ }
+ }
+
+ if(!this.task.Iterate())
+ {
+ // wait to be notified.
+ lock(this.mutex)
+ {
+ if(this.shutdown)
+ {
+ return;
+ }
+
+ while(!this.pending)
+ {
+ Monitor.Wait(this.mutex);
+ }
+ }
+ }
+ }
+ }
+ catch(ThreadAbortException)
+ {
+ // Prevent the ThreadAbortedException for propogating.
+ Thread.ResetAbort();
+ }
+ catch
+ {
+ }
+ finally
+ {
+ // Make sure we notify any waiting threads that thread
+ // has terminated.
+ lock(this.mutex)
+ {
+ this.terminated = true;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// Scheduler Service useful for running various delayed units of work.
+ /// </summary>
+ public class Scheduler
+ {
+ private object syncRoot = new object();
+ private readonly String name;
+ private TimerEx timer;
+ private readonly Dictionary<Object, TimerTask> timerTasks = new Dictionary<object, TimerTask>();
+ private bool started = false;
+
+ public Scheduler(String name)
+ {
+ this.name = name;
+ }
+
+ /// <summary>
+ /// Executes the given task periodically using a fixed-delay execution style
+ /// which prevents tasks from bunching should there be some delay such as
+ /// garbage collection or machine sleep.
+ ///
+ /// This repeating unit of work can later be cancelled using the WaitCallback
+ /// that was originally used to initiate the processing.
+ /// </summary>
+ public void ExecutePeriodically(WaitCallback task, object arg, int period)
+ {
+ lock (this.syncRoot)
+ {
+ CheckStarted();
+ TimerTask timerTask = timer.Schedule(task, arg, period, period);
+ timerTasks.Add(task, timerTask);
+ }
+ }
+
+ /// <summary>
+ /// Executes the given task periodically using a fixed-delay execution style
+ /// which prevents tasks from bunching should there be some delay such as
+ /// garbage collection or machine sleep.
+ ///
+ /// This repeating unit of work can later be cancelled using the WaitCallback
+ /// that was originally used to initiate the processing.
+ /// </summary>
+ public void ExecutePeriodically(WaitCallback task, object arg, TimeSpan period)
+ {
+ lock (this.syncRoot)
+ {
+ CheckStarted();
+ TimerTask timerTask = timer.Schedule(task, arg, period, period);
+ timerTasks.Add(task, timerTask);
+ }
+ }
+
+ /// <summary>
+ /// Executes the given task the after delay, no reference is kept for this
+ /// task so it cannot be cancelled later.
+ /// </summary>
+ public void ExecuteAfterDelay(WaitCallback task, object arg, int delay)
+ {
+ lock (this.syncRoot)
+ {
+ CheckStarted();
+ }
+
+ timer.Schedule(task, arg, delay);
+ }
+
+ /// <summary>
+ /// Executes the given task the after delay, no reference is kept for this
+ /// task so it cannot be cancelled later.
+ /// </summary>
+ public void ExecuteAfterDelay(WaitCallback task, object arg, TimeSpan delay)
+ {
+ lock (this.syncRoot)
+ {
+ CheckStarted();
+ }
+
+ timer.Schedule(task, arg, delay);
+ }
+
+ public void Cancel(object task)
+ {
+ lock (this.syncRoot)
+ {
+ if (timerTasks.ContainsKey(task))
+ {
+ TimerTask ticket = timerTasks[task];
+ if (ticket != null)
+ {
+ ticket.Cancel();
+ timer.Purge(); // remove cancelled TimerTasks
+ }
+
+ timerTasks.Remove(task);
+ }
+ }
+ }
+
+ public void Start()
+ {
+ lock (this.syncRoot)
+ {
+ this.timer = new TimerEx(name, true);
+ this.started = true;
+ }
+ }
+
+ public void Stop()
+ {
+ lock (this.syncRoot)
+ {
+ this.started = false;
+ if (this.timer != null)
+ {
+ this.timer.Cancel();
+ }
+ }
+ }
+
+ public String Name
+ {
+ get { return this.name; }
+ }
+
+ public bool Started
+ {
+ get
+ {
+ lock (this.syncRoot)
+ {
+ return this.started;
+ }
+ }
+ }
+
+ public override string ToString()
+ {
+ return string.Format("[Scheduler][{0}]", name);
+ }
+
+ private void CheckStarted()
+ {
+ if (!this.started)
+ {
+ throw new InvalidOperationException("The Schedular has not been started yet");
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Scheduler.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MQTT
+{
+ /// <summary>
+ /// Represents a task that may take a few iterations to complete.
+ /// </summary>
+ public interface Task
+ {
+ /// <summary>
+ /// Performs some portion of the work that this Task object is
+ /// assigned to complete. When the task is entirely finished this
+ /// method should return false.
+ /// </summary>
+ /// <returns>
+ /// A <see cref="System.Boolean"/> this indicates if this Task should
+ /// be run again.
+ /// </returns>
+ bool Iterate();
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/Task.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// Allows you to request a thread execute the associated Task.
+ /// </summary>
+ public interface TaskRunner
+ {
+ /// <summary>
+ /// Wakeup the TaskRunner and have it check for any pending work that
+ /// needs to be completed. If none is found it will go back to sleep
+ /// until another Wakeup call is made.
+ /// </summary>
+ void Wakeup();
+
+ /// <summary>
+ /// Attempt to Shutdown the TaskRunner, this method will wait indefinitely
+ /// for the TaskRunner to quite if the task runner is in a call to its Task's
+ /// run method and that never returns.
+ /// </summary>
+ void Shutdown();
+
+ /// <summary>
+ /// Performs a timed wait for the TaskRunner to shutdown. If the TaskRunner
+ /// is in a call to its Task's run method and that does not return before the
+ /// timeout expires this method returns and the TaskRunner may remain in the
+ /// running state.
+ /// </summary>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
+ void Shutdown(TimeSpan timeout);
+
+ /// <summary>
+ /// Performs a timed wait for the TaskRunner to shutdown. If the TaskRunner
+ /// is in a call to its Task's run method and that does not return before the
+ /// timeout expires this method sends an Abort to the Task thread and return.
+ /// </summary>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
+ void ShutdownWithAbort(TimeSpan timeout);
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunner.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// Manages the thread pool for long running tasks. Long running tasks are not
+ /// always active but when they are active, they may need a few iterations of
+ /// processing for them to become idle. The manager ensures that each task is
+ /// processes but that no one task overtakes the system. This is kina like
+ /// cooperative multitasking.
+ ///
+ /// If your OS/JVM combination has a good thread model, you may want to avoid
+ /// using a thread pool to run tasks and use a DedicatedTaskRunner instead.
+ /// </summary>
+ public class TaskRunnerFactory
+ {
+ public string name = "MQTT Task";
+ public ThreadPriority priority = ThreadPriority.Normal;
+ public int maxIterationsPerRun = 1000;
+ public bool dedicatedTaskRunner = true;
+
+ public TaskRunnerFactory()
+ {
+ }
+
+ public TaskRunner CreateTaskRunner(Task task)
+ {
+ return CreateTaskRunner(task, this.name);
+ }
+
+ public TaskRunner CreateTaskRunner(Task task, string name)
+ {
+ return CreateTaskRunner(task, name, this.priority);
+ }
+
+ public TaskRunner CreateTaskRunner(Task task, string name, ThreadPriority taskPriority)
+ {
+ if(this.dedicatedTaskRunner)
+ {
+ return new DedicatedTaskRunner(task, name, taskPriority);
+ }
+ else
+ {
+ return new PooledTaskRunner(task, this.maxIterationsPerRun);
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs?rev=1542120&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs Thu Nov 14 23:39:24 2013
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ /// <summary>
+ /// An Transport Command Marker Interface
+ /// </summary>
+ public interface Command : ICloneable
+ {
+ int CommandId
+ {
+ get;
+ set;
+ }
+
+ bool ResponseRequired
+ {
+ get;
+ set;
+ }
+
+ bool IsDestinationInfo
+ {
+ get;
+ }
+
+ bool IsKeepAliveInfo
+ {
+ get;
+ }
+
+ bool IsMessage
+ {
+ get;
+ }
+
+ bool IsMessageAck
+ {
+ get;
+ }
+
+ bool IsMessageDispatch
+ {
+ get;
+ }
+
+ bool IsProducerInfo
+ {
+ get;
+ }
+
+ bool IsResponse
+ {
+ get;
+ }
+
+ bool IsSessionInfo
+ {
+ get;
+ }
+
+ bool IsShutdownInfo
+ {
+ get;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Command.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj?rev=1542120&r1=1542119&r2=1542120&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/vs2008-mqtt.csproj Thu Nov 14 23:39:24 2013
@@ -39,23 +39,46 @@
<ItemGroup>
<Compile Include="src\main\csharp\ConnectionFactory.cs" />
<Compile Include="src\main\csharp\ConnectionMetaData.cs" />
- <Compile Include="src\main\csharp\commands\CONNECT.cs" />
- <Compile Include="src\main\csharp\commands\CONNACK.cs" />
- <Compile Include="src\main\csharp\commands\PUBLISH.cs" />
- <Compile Include="src\main\csharp\commands\PUBACK.cs" />
- <Compile Include="src\main\csharp\commands\PUBREC.cs" />
- <Compile Include="src\main\csharp\commands\PUBREL.cs" />
- <Compile Include="src\main\csharp\commands\SUBSCRIBE.cs" />
- <Compile Include="src\main\csharp\commands\SUBACK.cs" />
- <Compile Include="src\main\csharp\commands\UNSUBSCRIBE.cs" />
- <Compile Include="src\main\csharp\commands\UNSUBACK.cs" />
- <Compile Include="src\main\csharp\commands\PINGREQ.cs" />
- <Compile Include="src\main\csharp\commands\PINGRESP.cs" />
- <Compile Include="src\main\csharp\commands\DISCONNECT.cs" />
+ <Compile Include="src\main\csharp\Connection.cs" />
+ <Compile Include="src\main\csharp\CommonAssemblyInfo.cs" />
+ <Compile Include="src\main\csharp\Session.cs" />
+ <Compile Include="src\main\csharp\MessageConsumer.cs" />
+ <Compile Include="src\main\csharp\MessageProducer.cs" />
+ <Compile Include="src\main\csharp\Commands\CONNACK.cs" />
+ <Compile Include="src\main\csharp\Commands\CONNECT.cs" />
+ <Compile Include="src\main\csharp\Commands\DISCONNECT.cs" />
+ <Compile Include="src\main\csharp\Commands\PINGREQ.cs" />
+ <Compile Include="src\main\csharp\Commands\PINGRESP.cs" />
+ <Compile Include="src\main\csharp\Commands\PUBACK.cs" />
+ <Compile Include="src\main\csharp\Commands\PUBLISH.cs" />
+ <Compile Include="src\main\csharp\Commands\PUBREC.cs" />
+ <Compile Include="src\main\csharp\Commands\PUBREL.cs" />
+ <Compile Include="src\main\csharp\Commands\SUBACK.cs" />
+ <Compile Include="src\main\csharp\Commands\SUBSCRIBE.cs" />
+ <Compile Include="src\main\csharp\Commands\UNSUBACK.cs" />
+ <Compile Include="src\main\csharp\Commands\UNSUBSCRIBE.cs" />
+ <Compile Include="src\main\csharp\Threads\ThreadPoolExecutor.cs" />
+ <Compile Include="src\main\csharp\Transport\ITransport.cs" />
+ <Compile Include="src\main\csharp\Transport\ITransportFactory.cs" />
+ <Compile Include="src\main\csharp\Util\MQTTMessageTransformation.cs" />
+ <Compile Include="src\main\csharp\ConnectionClosedException.cs" />
+ <Compile Include="src\main\csharp\IDispatcher.cs" />
+ <Compile Include="src\main\csharp\IOException.cs" />
+ <Compile Include="src\main\csharp\SessionExecutor.cs" />
+ <Compile Include="src\main\csharp\Util\FifoMessageDispatchChannel.cs" />
+ <Compile Include="src\main\csharp\Util\MessageDispatchChannel.cs" />
+ <Compile Include="src\main\csharp\Threads\DedicatedTaskRunner.cs" />
+ <Compile Include="src\main\csharp\Threads\Scheduler.cs" />
+ <Compile Include="src\main\csharp\Threads\Task.cs" />
+ <Compile Include="src\main\csharp\Threads\TaskRunner.cs" />
+ <Compile Include="src\main\csharp\Threads\TaskRunnerFactory.cs" />
+ <Compile Include="src\main\csharp\Messages\MessageDispatch.cs" />
+ <Compile Include="src\main\csharp\Messages\BytesMessage.cs" />
+ <Compile Include="src\main\csharp\Messages\MQTTMessage.cs" />
+ <Compile Include="src\main\csharp\Transport\Command.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="keyfile\" />
- <Folder Include="src\main\csharp\commands\" />
</ItemGroup>
<ItemGroup>
<None Include="LICENSE.txt" />