You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/15 00:18:57 UTC
svn commit: r1542115 - in
/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp: ./
Commands/ Messages/ Threads/ Transport/ Transport/Tcp/ Util/ commands/
Author: tabish
Date: Thu Nov 14 23:18:56 2013
New Revision: 1542115
URL: http://svn.apache.org/r1542115
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Starting to implement the framework
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/
- copied from r1541299, activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/commands/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/Tcp/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (with props)
Removed:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/commands/
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs?rev=1542115&r1=1541299&r2=1542115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Commands/CONNECT.cs Thu Nov 14 23:18:56 2013
@@ -29,6 +29,7 @@ namespace Apache.NMS.MQTT.Commands
public class CONNECT
{
public const byte TYPE = 1;
+ public const String PROTOCOL_NAME = "MQIsdp";
private byte version = 3;
public byte Version
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,27 @@
+using System;
+using System.Reflection;
+using System.Runtime.InteropServices;
+
+// ------------------------------------------------------------------------------
+// <autogenerated>
+// This code was generated by a tool.
+// Mono Runtime Version: 4.0.30319.1
+//
+// Changes to this file may cause incorrect behavior and will be lost if
+// the code is regenerated.
+// </autogenerated>
+// ------------------------------------------------------------------------------
+
+[assembly: ComVisibleAttribute(false)]
+[assembly: CLSCompliantAttribute(true)]
+[assembly: AssemblyTitleAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyDescriptionAttribute("Apache NMS for MQTT Class Library (.Net Messaging Library Implementation): An implementation of the NMS API for MQTT")]
+[assembly: AssemblyConfigurationAttribute("SNAPSHOT")]
+[assembly: AssemblyCompanyAttribute("http://activemq.apache.org/nms")]
+[assembly: AssemblyProductAttribute("Apache NMS for MQTT Class Library")]
+[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2013 Apache Software Foundation")]
+[assembly: AssemblyTrademarkAttribute("")]
+[assembly: AssemblyCultureAttribute("")]
+[assembly: AssemblyVersionAttribute("1.7.0.3237")]
+[assembly: AssemblyInformationalVersionAttribute("1.7.0")]
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/CommonAssemblyInfo.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,338 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using Apache.NMS.MQTT.Transport;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT
+{
+ public class Connection : IConnection
+ {
+ private readonly CONNECT info = null;
+ private ITransport transport;
+ private readonly Uri brokerUri;
+ private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+ private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+ private readonly object myLock = new object();
+ private readonly Atomic<bool> connected = new Atomic<bool>(false);
+ private readonly Atomic<bool> closed = new Atomic<bool>(false);
+ private readonly Atomic<bool> closing = new Atomic<bool>(false);
+ private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+ private Exception firstFailureError = null;
+ private int sessionCounter = 0;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private ConnectionMetaData metaData = null;
+ private bool disposed = false;
+ private readonly MessageTransformation messageTransformation;
+ private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+
+ public Connection(Uri connectionUri, ITransport transport)
+ {
+ this.brokerUri = connectionUri;
+ this.clientIdGenerator = clientIdGenerator;
+
+ SetTransport(transport);
+
+ this.info = new CONNECT();
+ }
+
+ ~Connection()
+ {
+ Dispose(false);
+ }
+
+ #region Properties
+
+ /// <summary>
+ /// A delegate that can receive transport level exceptions.
+ /// </summary>
+ public event ExceptionListener ExceptionListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been interrupted.
+ /// </summary>
+ public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+ /// <summary>
+ /// An asynchronous listener that is notified when a Fault tolerant connection
+ /// has been resumed.
+ /// </summary>
+ public event ConnectionResumedListener ConnectionResumedListener;
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ public String UserName
+ {
+ get { return this.info.UserName; }
+ set { this.info.UserName = value; }
+ }
+
+ public String Password
+ {
+ get { return this.info.Password; }
+ set { this.info.Password = value; }
+ }
+
+ /// <summary>
+ /// This property sets the acknowledgment mode for the connection.
+ /// The URI parameter connection.ackmode can be set to a string value
+ /// that maps to the enumeration value.
+ /// </summary>
+ public string AckMode
+ {
+ set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+ }
+
+ public IConnectionMetaData MetaData
+ {
+ get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+ }
+
+ public Uri BrokerUri
+ {
+ get { return brokerUri; }
+ }
+
+ public ITransport ITransport
+ {
+ get { return transport; }
+ set { this.transport = value; }
+ }
+
+ public bool TransportFailed
+ {
+ get { return this.transportFailed.Value; }
+ }
+
+ public Exception FirstFailureError
+ {
+ get { return this.firstFailureError; }
+ }
+
+ public TimeSpan RequestTimeout
+ {
+ get { return this.requestTimeout; }
+ set { this.requestTimeout = value; }
+ }
+
+ public AcknowledgementMode AcknowledgementMode
+ {
+ get { return acknowledgementMode; }
+ set { this.acknowledgementMode = value; }
+ }
+
+ internal MessageTransformation MessageTransformation
+ {
+ get { return this.messageTransformation; }
+ }
+
+ #endregion
+
+ private void SetTransport(ITransport newTransport)
+ {
+ this.transport = newTransport;
+ this.transport.Command = new CommandHandler(OnCommand);
+ this.transport.Exception = new ExceptionHandler(OnTransportException);
+ this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
+ this.transport.Resumed = new ResumedHandler(OnTransportResumed);
+ }
+
+ /// <summary>
+ /// Starts asynchronous message delivery of incoming messages for this connection.
+ /// Synchronous delivery is unaffected.
+ /// </summary>
+ public void Start()
+ {
+ CheckConnected();
+ if(started.CompareAndSet(false, true))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Start();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// This property determines if the asynchronous message delivery of incoming
+ /// messages has been started for this connection.
+ /// </summary>
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+
+ /// <summary>
+ /// Temporarily stop asynchronous delivery of inbound messages for this connection.
+ /// The sending of outbound messages is unaffected.
+ /// </summary>
+ public void Stop()
+ {
+ if(started.CompareAndSet(true, false))
+ {
+ lock(sessions.SyncRoot)
+ {
+ foreach(Session session in sessions)
+ {
+ session.Stop();
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession()
+ {
+ return CreateMQTTSession(acknowledgementMode);
+ }
+
+ /// <summary>
+ /// Creates a new session to work on this connection
+ /// </summary>
+ public ISession CreateSession(AcknowledgementMode sessionAcknowledgementMode)
+ {
+ return CreateMQTTSession(sessionAcknowledgementMode);
+ }
+
+ protected virtual Session CreateMQTTSession(AcknowledgementMode ackMode)
+ {
+ CheckConnected();
+ return new Session(this, NextSessionId, ackMode);
+ }
+
+ internal void AddSession(Session session)
+ {
+ if(!this.closing.Value)
+ {
+ sessions.Add(session);
+ }
+ }
+
+ internal void RemoveSession(Session session)
+ {
+ if(!this.closing.Value)
+ {
+ sessions.Remove(session);
+ RemoveDispatcher(session);
+ }
+ }
+
+ internal void AddDispatcher(ConsumerId id, IDispatcher dispatcher)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Add(id, dispatcher);
+ }
+ }
+
+ internal void RemoveDispatcher(ConsumerId id)
+ {
+ if(!this.closing.Value)
+ {
+ this.dispatchers.Remove(id);
+ }
+ }
+
+ internal void AddProducer(ProducerId id, MessageProducer producer)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Add(id, producer);
+ }
+ }
+
+ internal void RemoveProducer(ProducerId id)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Remove(id);
+ }
+ }
+
+ internal void RemoveDispatcher(IDispatcher dispatcher)
+ {
+ this.connectionAudit.RemoveDispatcher(dispatcher);
+ }
+
+ public void Close()
+ {
+ // TODO
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
+
+ protected void CheckClosedOrFailed()
+ {
+ CheckClosed();
+ if(transportFailed.Value)
+ {
+ throw new ConnectionFailedException(firstFailureError.Message);
+ }
+ }
+
+ protected void CheckClosed()
+ {
+ if(closed.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT
+{
+ /// <summary>
+ /// Exception thrown when a connection is used that it already closed
+ /// </summary>
+ [Serializable]
+ public class ConnectionClosedException : NMSException
+ {
+ public ConnectionClosedException() : base("The connection is already closed!")
+ {
+ }
+
+ public ConnectionClosedException(string message)
+ : base(message)
+ {
+ }
+
+ public ConnectionClosedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public ConnectionClosedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public ConnectionClosedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the ConnectionClosedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected ConnectionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionClosedException.cs
------------------------------------------------------------------------------
svn:executable = *
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs?rev=1542115&r1=1542114&r2=1542115&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/ConnectionMetaData.cs Thu Nov 14 23:18:56 2013
@@ -47,8 +47,7 @@ namespace Apache.NMS.MQTT
this.providerMinorVersion = asmName.Version.Minor;
this.providerVersion = asmName.Version.ToString();
- this.nmsxProperties =
- new String[]{ "NMSXGroupID", "NMSXGroupSeq", "NMSXDeliveryCount", "NMSXProducerTXID" };
+ this.nmsxProperties = null;
foreach(AssemblyName name in self.GetReferencedAssemblies())
{
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT
+{
+ /// <summary>
+ /// Interface that provides for a Class to provide dispatching service for
+ /// an OpenWire MessageDispatch command.
+ /// </summary>
+ public interface IDispatcher
+ {
+ void Dispatch(MessageDispatch messageDispatch);
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IDispatcher.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+
+ /// <summary>
+ /// Exception thrown when an IO error occurs
+ /// </summary>
+ public class IOException : NMSException
+ {
+ public IOException()
+ : base("IO Exception failed with missing exception log")
+ {
+ }
+
+ public IOException(String msg)
+ : base(msg)
+ {
+ }
+
+ public IOException(String msg, Exception inner)
+ : base(msg, inner)
+ {
+ }
+ }
+}
+
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/IOException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,68 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.MQTT
+{
+ public class MessageConsumer : IMessageConsumer, IDispatcher
+ {
+ private readonly MessageTransformation messageTransformation;
+ private readonly MessageDispatchChannel unconsumedMessages;
+ private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+
+ private readonly Atomic<bool> started = new Atomic<bool>();
+
+ private Exception failureError;
+ private ThreadPoolExecutor executor;
+
+ private event MessageListener listener;
+
+ public MessageConsumer()
+ {
+ }
+
+ #region Property Accessors
+
+ public Exception FailureError
+ {
+ get { return this.failureError; }
+ set { this.failureError = value; }
+ }
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ /// <summary>
+ /// A Delegate that is called each time a Message is dispatched to allow the client to do
+ /// any necessary transformations on the received message before it is delivered.
+ /// </summary>
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ #endregion
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,190 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+
+namespace Apache.NMS.MQTT
+{
+ public class MessageProducer : IMessageProducer
+ {
+ private readonly Session session;
+ private readonly object closedLock = new object();
+ private bool closed = false;
+
+ private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
+ private TimeSpan requestTimeout;
+ protected bool disposed = false;
+
+ private readonly MessageTransformation messageTransformation;
+
+ public MessageProducer(Session session, TimeSpan requestTimeout)
+ {
+ this.session = session;
+ this.RequestTimeout = requestTimeout;
+ this.messageTransformation = session.Connection.MessageTransformation;
+ }
+
+ ~MessageProducer()
+ {
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ }
+
+ disposed = true;
+ }
+
+ public void Close()
+ {
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ return;
+ }
+
+ Shutdown();
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Remove of Producer[{0}] sent.", this.ProducerId);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Called from the Parent session to deactivate this Producer, when a parent
+ /// is closed all children are automatically removed from the broker so this
+ /// method circumvents the need to send a Remove command to the broker.
+ /// </summary>
+ internal void Shutdown()
+ {
+ lock(closedLock)
+ {
+ if(closed)
+ {
+ return;
+ }
+
+ try
+ {
+ session.RemoveProducer(info.ProducerId);
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Error during producer close: {0}", ex);
+ }
+
+ closed = true;
+ }
+ }
+
+ public void Send(IMessage message)
+ {
+ Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ }
+
+ public void Send(IDestination destination, IMessage message)
+ {
+ Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ }
+
+ public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+ }
+
+ public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ {
+ Send(destination, message, deliveryMode, priority, timeToLive, true);
+ }
+
+ protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+ {
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ public IMessage CreateMessage()
+ {
+ return session.CreateMessage();
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ return session.CreateTextMessage();
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ return session.CreateTextMessage(text);
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ return session.CreateMapMessage();
+ }
+
+ public IObjectMessage CreateObjectMessage(object body)
+ {
+ return session.CreateObjectMessage(body);
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return session.CreateBytesMessage();
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ return session.CreateBytesMessage(body);
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ return session.CreateStreamMessage();
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,425 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+using System;
+using System.Collections;
+
+namespace Apache.NMS.MQTT
+{
+ public class Session : ISession, IDispatcher
+ {
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
+
+ private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+
+ private readonly SessionExecutor executor;
+ private readonly Connection connection;
+
+ private int consumerCounter;
+ private int producerCounter;
+
+ protected bool disposed = false;
+ protected bool closed = false;
+ protected bool closing = false;
+
+ private readonly AcknowledgementMode acknowledgementMode;
+
+ public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+ {
+ this.connection = connection;
+ this.acknowledgementMode = acknowledgementMode;
+
+ this.ConsumerTransformer = connection.ConsumerTransformer;
+ this.ProducerTransformer = connection.ProducerTransformer;
+
+ this.executor = new SessionExecutor(this, this.consumers);
+
+ if(connection.IsStarted)
+ {
+ this.Start();
+ }
+
+ connection.AddSession(this);
+ }
+
+ ~Session()
+ {
+ Dispose(false);
+ }
+
+ #region Property Accessors
+
+ public virtual AcknowledgementMode AcknowledgementMode
+ {
+ get { return this.acknowledgementMode; }
+ }
+
+ public virtual bool IsClientAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+ }
+
+ public virtual bool IsAutoAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+ }
+
+ public virtual bool IsDupsOkAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+ }
+
+ public virtual bool IsIndividualAcknowledge
+ {
+ get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
+ }
+
+ public virtual bool IsTransacted
+ {
+ get{ return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+ }
+
+ public SessionExecutor Executor
+ {
+ get { return this.executor; }
+ }
+
+ private ConsumerTransformerDelegate consumerTransformer;
+ /// <summary>
+ /// A Delegate that is called each time a Message is dispatched to allow the client to do
+ /// any necessary transformations on the received message before it is delivered.
+ /// The Session instance sets the delegate on each Consumer it creates.
+ /// </summary>
+ public ConsumerTransformerDelegate ConsumerTransformer
+ {
+ get { return this.consumerTransformer; }
+ set { this.consumerTransformer = value; }
+ }
+
+ private ProducerTransformerDelegate producerTransformer;
+ /// <summary>
+ /// A delegate that is called each time a Message is sent from this Producer which allows
+ /// the application to perform any needed transformations on the Message before it is sent.
+ /// The Session instance sets the delegate on each Producer it creates.
+ /// </summary>
+ public ProducerTransformerDelegate ProducerTransformer
+ {
+ get { return this.producerTransformer; }
+ set { this.producerTransformer = value; }
+ }
+
+ #endregion
+
+ #region ISession Members
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(this.disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ // Force a Stop when we are Disposing vs a Normal Close.
+ this.executor.Stop(this.disposeStopTimeout);
+
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ this.disposed = true;
+ }
+
+ public virtual void Close()
+ {
+ if (!this.closed)
+ {
+ try
+ {
+ Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId);
+ DoClose();
+ Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId);
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Error during session close: {0}", ex);
+ }
+ }
+ }
+
+ public IMessageProducer CreateProducer()
+ {
+ return CreateProducer(null);
+ }
+
+ public IMessageProducer CreateProducer(IDestination destination)
+ {
+ MessageProducer producer = null;
+ // TODO
+ return producer;
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination)
+ {
+ return CreateConsumer(destination, null, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+ {
+ return CreateConsumer(destination, selector, false);
+ }
+
+ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+ }
+
+ MessageConsumer consumer = null;
+ return consumer;
+ }
+
+ public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+ }
+
+ MessageConsumer consumer = null;
+ return consumer;
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue)
+ {
+ throw new NotSupportedException("Not supported with MQTT Protocol");
+ }
+
+ public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+ {
+ throw new NotSupportedException("Not supported with MQTT Protocol");
+ }
+
+ public IQueue GetQueue(string name)
+ {
+ throw new NotSupportedException("Not supported with MQTT Protocol");
+ }
+
+ public ITopic GetTopic(string name)
+ {
+ return null; // TODO
+ }
+
+ public ITemporaryQueue CreateTemporaryQueue()
+ {
+ throw new NotSupportedException("Not supported with MQTT Protocol");
+ }
+
+ public ITemporaryTopic CreateTemporaryTopic()
+ {
+ throw new NotSupportedException("Not supported with MQTT Protocol");
+ }
+
+ /// <summary>
+ /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+ /// </summary>
+ public void DeleteDestination(IDestination destination)
+ {
+ throw new NotSupportedException("MQTT Cannot delete Destinations");
+ }
+
+ public IMessage CreateMessage()
+ {
+ throw new NotSupportedException("No empty Message in MQTT");
+ }
+
+ public ITextMessage CreateTextMessage()
+ {
+ throw new NotSupportedException("No Text Message in MQTT");
+ }
+
+ public ITextMessage CreateTextMessage(string text)
+ {
+ throw new NotSupportedException("No Text Message in MQTT");
+ }
+
+ public IMapMessage CreateMapMessage()
+ {
+ throw new NotSupportedException("No Map Message in MQTT");
+ }
+
+ public IBytesMessage CreateBytesMessage()
+ {
+ return ConfigureMessage(new BytesMessage()) as IBytesMessage;
+ }
+
+ public IBytesMessage CreateBytesMessage(byte[] body)
+ {
+ BytesMessage answer = new BytesMessage();
+ answer.Content = body;
+ return ConfigureMessage(answer) as IBytesMessage;
+ }
+
+ public IStreamMessage CreateStreamMessage()
+ {
+ throw new NotSupportedException("No Object Message in MQTT");
+ }
+
+ public IObjectMessage CreateObjectMessage(object body)
+ {
+ throw new NotSupportedException("No Object Message in MQTT");
+ }
+
+ public void Commit()
+ {
+ throw new NotSupportedException("No Transaction support in MQTT");
+ }
+
+ public void Rollback()
+ {
+ throw new NotSupportedException("No Transaction support in MQTT");
+ }
+
+ public void Recover()
+ {
+ CheckClosed();
+
+ if (acknowledgementMode == AcknowledgementMode.Transactional)
+ {
+ throw new IllegalStateException("Cannot Recover a Transacted Session");
+ }
+
+ throw new NotSupportedException("No Recover support yet");
+ }
+
+ public void Stop()
+ {
+ if(this.executor != null)
+ {
+ this.executor.Stop();
+ }
+ }
+
+ public void Start()
+ {
+ foreach(MessageConsumer consumer in this.consumers.Values)
+ {
+ consumer.Start();
+ }
+
+ if(this.executor != null)
+ {
+ this.executor.Start();
+ }
+ }
+
+ public bool Started
+ {
+ get
+ {
+ return this.executor != null ? this.executor.Running : false;
+ }
+ }
+
+ #endregion
+
+ public void AddConsumer(MessageConsumer consumer)
+ {
+ if(!this.closing)
+ {
+ ConsumerId id = consumer.ConsumerId;
+
+ // Registered with Connection before we register at the broker.
+ consumers[id] = consumer;
+ connection.AddDispatcher(id, this);
+ }
+ }
+
+ public void RemoveConsumer(MessageConsumer consumer)
+ {
+ connection.RemoveDispatcher(consumer.ConsumerId);
+ if(!this.closing)
+ {
+ consumers.Remove(consumer.ConsumerId);
+ }
+ connection.RemoveDispatcher(consumer);
+ }
+
+ public void AddProducer(MessageProducer producer)
+ {
+ if(!this.closing)
+ {
+ ProducerId id = producer.ProducerId;
+
+ this.producers[id] = producer;
+ this.connection.AddProducer(id, producer);
+ }
+ }
+
+ public void RemoveProducer(ProducerId objectId)
+ {
+ connection.RemoveProducer(objectId);
+ if(!this.closing)
+ {
+ producers.Remove(objectId);
+ }
+ }
+
+ private MQTTMessage ConfigureMessage(MQTTMessage message)
+ {
+ message.Connection = this.connection;
+ return message;
+ }
+
+ private void CheckClosed()
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Session is Closed");
+ }
+ }
+
+ /// <summary>
+ /// Prevents message from throwing an exception if a client calls Acknoweldge on
+ /// a message that is part of a transaction either being produced or consumed. The
+ /// JMS Spec indicates that users should be able to call Acknowledge with no effect
+ /// if the message is in a transaction.
+ /// </summary>
+ /// <param name="message">
+ /// A <see cref="ActiveMQMessage"/>
+ /// </param>
+ private static void DoNothingAcknowledge(ActiveMQMessage message)
+ {
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using Apache.NMS.MQTT.Commands;
+using Apache.NMS.MQTT.Util;
+using Apache.NMS.MQTT.Threads;
+
+namespace Apache.NMS.MQTT
+{
+ public class SessionExecutor : Threads.Task
+ {
+ private readonly MessageDispatchChannel messageQueue = null;
+ private TaskRunner taskRunner = null;
+
+ private readonly Session session = null;
+ private readonly IDictionary consumers = null;
+
+ public SessionExecutor(Session session, IDictionary consumers)
+ {
+ this.session = session;
+ this.consumers = consumers;
+
+ if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
+ {
+ this.messageQueue = new SimplePriorityMessageDispatchChannel();
+ }
+ else
+ {
+ this.messageQueue = new FifoMessageDispatchChannel();
+ }
+ }
+
+ ~SessionExecutor()
+ {
+ try
+ {
+ Stop();
+ Close();
+ Clear();
+ }
+ catch
+ {
+ }
+ }
+
+ public void Execute(MessageDispatch dispatch)
+ {
+ // Add the data to the queue.
+ this.messageQueue.Enqueue(dispatch);
+ this.Wakeup();
+ }
+
+ public void ExecuteFirst(MessageDispatch dispatch)
+ {
+ // Add the data to the queue.
+ this.messageQueue.EnqueueFirst(dispatch);
+ this.Wakeup();
+ }
+
+ public void Wakeup()
+ {
+ TaskRunner taskRunner = this.taskRunner;
+
+ lock(messageQueue.SyncRoot)
+ {
+ if(this.taskRunner == null)
+ {
+ this.taskRunner = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(this);
+ }
+
+ taskRunner = this.taskRunner;
+ }
+
+ taskRunner.Wakeup();
+ }
+
+ public void Start()
+ {
+ if(!messageQueue.Running)
+ {
+ messageQueue.Start();
+
+ if(HasUncomsumedMessages)
+ {
+ this.Wakeup();
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ if(messageQueue.Running)
+ {
+ messageQueue.Stop();
+ TaskRunner taskRunner = this.taskRunner;
+
+ if(taskRunner != null)
+ {
+ this.taskRunner = null;
+ taskRunner.Shutdown();
+ }
+ }
+ }
+
+ public void Stop(TimeSpan timeout)
+ {
+ if(messageQueue.Running)
+ {
+ messageQueue.Stop();
+ TaskRunner taskRunner = this.taskRunner;
+
+ if(taskRunner != null)
+ {
+ this.taskRunner = null;
+ taskRunner.ShutdownWithAbort(timeout);
+ }
+ }
+ }
+
+ public void Close()
+ {
+ this.messageQueue.Close();
+ }
+
+ public void Dispatch(MessageDispatch dispatch)
+ {
+ try
+ {
+ MessageConsumer consumer = null;
+
+ lock(this.consumers.SyncRoot)
+ {
+ if(this.consumers.Contains(dispatch.ConsumerId))
+ {
+ consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+ }
+ }
+
+ // If the consumer is not available, just ignore the message.
+ // Otherwise, dispatch the message to the consumer.
+ if(consumer != null)
+ {
+ consumer.Dispatch(dispatch);
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+ }
+ }
+
+ public bool Iterate()
+ {
+ try
+ {
+ lock(this.consumers.SyncRoot)
+ {
+ // Deliver any messages queued on the consumer to their listeners.
+ foreach( MessageConsumer consumer in this.consumers.Values )
+ {
+ if(consumer.Iterate())
+ {
+ return true;
+ }
+ }
+ }
+
+ // No messages left queued on the listeners.. so now dispatch messages
+ // queued on the session
+ MessageDispatch message = messageQueue.DequeueNoWait();
+
+ if(message != null)
+ {
+ this.Dispatch(message);
+ return !messageQueue.Empty;
+ }
+
+ return false;
+ }
+ catch(Exception ex)
+ {
+ Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+ this.session.Connection.OnSessionException(this.session, ex);
+ }
+
+ return true;
+ }
+
+ public void ClearMessagesInProgress()
+ {
+ this.messageQueue.Clear();
+ }
+
+ public void Clear()
+ {
+ this.messageQueue.Clear();
+ }
+
+ public MessageDispatch[] UnconsumedMessages
+ {
+ get{ return messageQueue.RemoveAll(); }
+ }
+
+ public bool HasUncomsumedMessages
+ {
+ get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; }
+ }
+
+ public bool Running
+ {
+ get{ return this.messageQueue.Running; }
+ }
+
+ public bool Empty
+ {
+ get{ return this.messageQueue.Empty; }
+ }
+
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ /// <summary>
+ /// This class provides a wrapper around the ThreadPool mechanism in .NET
+ /// to allow for serial execution of jobs in the ThreadPool and provide
+ /// a means of shutting down the execution of jobs in a deterministic
+ /// way.
+ /// </summary>
+ public class ThreadPoolExecutor
+ {
+ private Queue<Future> workQueue = new Queue<Future>();
+ private Mutex syncRoot = new Mutex();
+ private bool running = false;
+ private bool closing = false;
+ private bool closed = false;
+ private ManualResetEvent executionComplete = new ManualResetEvent(true);
+ private Thread workThread = null;
+
+ /// <summary>
+ /// Represents an asynchronous task that is executed on the ThreadPool
+ /// at some point in the future.
+ /// </summary>
+ internal class Future
+ {
+ private readonly WaitCallback callback;
+ private readonly object callbackArg;
+
+ public Future(WaitCallback callback, object arg)
+ {
+ this.callback = callback;
+ this.callbackArg = arg;
+ }
+
+ public void Run()
+ {
+ if(this.callback == null)
+ {
+ throw new Exception("Future executed with null WaitCallback");
+ }
+
+ try
+ {
+ this.callback(callbackArg);
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker)
+ {
+ this.QueueUserWorkItem(worker, null);
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker, object arg)
+ {
+ if(worker == null)
+ {
+ throw new ArgumentNullException("Invalid WaitCallback passed");
+ }
+
+ if(!this.closed)
+ {
+ lock(syncRoot)
+ {
+ if(!this.closed || !this.closing)
+ {
+ this.workQueue.Enqueue(new Future(worker, arg));
+
+ if(!this.running)
+ {
+ this.executionComplete.Reset();
+ this.running = true;
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+ }
+
+ /// <summary>
+ /// Returns true if this ThreadPoolExecutor has been shut down but has not
+ /// finished running all the tasks that have been Queue. When a ThreadPoolExecutor
+ /// is shut down it will not accept any new tasks but it will complete all tasks
+ /// that have been previously queued.
+ /// </summary>
+ public bool IsShutdown
+ {
+ get { return this.closing; }
+ }
+
+ /// <summary>
+ /// Returns true if this ThreadPoolExecutor has been shut down and has also
+ /// completed processing of all outstanding tasks in its task Queue.
+ /// </summary>
+ public bool IsTerminated
+ {
+ get { return this.closed; }
+ }
+
+ public void Shutdown()
+ {
+ if(!this.closed)
+ {
+ lock(this.syncRoot)
+ {
+ if(!this.closed)
+ {
+ this.closing = true;
+
+ // Must be no tasks in Queue and none can be accepted
+ // now that we've flipped the closing toggle so safe to
+ // mark this ThreadPoolExecutor as closed.
+ if (!this.running)
+ {
+ this.closed = true;
+ this.executionComplete.Set();
+ }
+ }
+ }
+ }
+ }
+
+ public bool AwaitTermination(TimeSpan timeout)
+ {
+ if(!this.closed)
+ {
+ syncRoot.WaitOne();
+
+ if(!this.closed)
+ {
+ // If called from the worker thread we can't check this as it
+ // will deadlock us, just return whatever the closed state is.
+ if(this.running && Thread.CurrentThread != this.workThread)
+ {
+ syncRoot.ReleaseMutex();
+ this.closed = this.executionComplete.WaitOne(timeout, false);
+ syncRoot.WaitOne();
+ }
+ }
+
+ syncRoot.ReleaseMutex();
+ }
+
+ return this.closed;
+ }
+
+ private void QueueProcessor(object unused)
+ {
+ Future theTask = null;
+
+ lock(syncRoot)
+ {
+ this.workThread = Thread.CurrentThread;
+
+ if(this.workQueue.Count == 0)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ return;
+ }
+
+ theTask = this.workQueue.Dequeue();
+ }
+
+ try
+ {
+ theTask.Run();
+ }
+ finally
+ {
+ this.workThread = null;
+
+ if(this.workQueue.Count == 0)
+ {
+ lock(syncRoot)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ }
+ }
+ else
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Stomp.Commands;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ public delegate void CommandHandler(ITransport sender, Command command);
+ public delegate void ExceptionHandler(ITransport sender, Exception command);
+ public delegate void InterruptedHandler(ITransport sender);
+ public delegate void ResumedHandler(ITransport sender);
+
+ /// <summary>
+ /// Represents the logical networking transport layer. Transports implment the low
+ /// level protocol specific portion of the Communication between the Client and a Broker
+ /// such as TCP, UDP, etc. Transports make use of WireFormat objects to handle translateing
+ /// the cononical OpenWire Commands used in this client into binary wire level packets that
+ /// can be sent to the Broker or Service that the Transport connects to.
+ /// </summary>
+ public interface ITransport : IStartable, IDisposable, IStoppable
+ {
+ /// <summary>
+ /// Sends a Command object on the Wire but does not wait for any response from the
+ /// receiver before returning.
+ /// </summary>
+ /// <param name="command">
+ /// A <see cref="Command"/>
+ /// </param>
+ void Oneway(Command command);
+
+ /// <summary>
+ /// Sends a Command object which requires a response from the Broker but does not
+ /// wait for the response, instead a FutureResponse object is returned that the
+ /// caller can use to wait on the Broker's response.
+ /// </summary>
+ FutureResponse AsyncRequest(Command command);
+
+ /// <summary>
+ /// Sends a Command to the Broker and waits for a Response to that Command before
+ /// returning, this version waits indefinitely for a response.
+ /// </summary>
+ Response Request(Command command);
+
+ /// <summary>
+ /// Sends a Command to the Broker and waits for the given TimeSpan to expire for a
+ /// response before returning.
+ /// </summary>
+ Response Request(Command command, TimeSpan timeout);
+
+ /// <summary>
+ /// Allows a caller to find a specific type of Transport in the Chain of
+ /// Transports that is created. This allows a caller to find a specific
+ /// object in the Transport chain and set or get properties on that specific
+ /// instance. If the requested type isn't in the chain than Null is returned.
+ /// </summary>
+ Object Narrow(Type type);
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending synchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ int Timeout
+ {
+ get;
+ set;
+ }
+
+ /// <summary>
+ /// Timeout in milliseconds to wait for sending asynchronous messages or commands.
+ /// Set to -1 for infinite timeout.
+ /// </summary>
+ int AsyncTimeout
+ {
+ get;
+ set;
+ }
+
+ CommandHandler Command
+ {
+ get;
+ set;
+ }
+
+ ExceptionHandler Exception
+ {
+ get;
+ set;
+ }
+
+ InterruptedHandler Interrupted
+ {
+ get;
+ set;
+ }
+
+ ResumedHandler Resumed
+ {
+ get;
+ set;
+ }
+
+ /// <value>
+ /// Indicates if this Transport has already been disposed and can no longer
+ /// be used.
+ /// </value>
+ bool IsDisposed
+ {
+ get;
+ }
+
+ /// <value>
+ /// Indicates if this Transport is Fault Tolerant or not. A fault Tolerant
+ /// Transport handles low level connection errors internally allowing a client
+ /// to remain unaware of wire level disconnection and reconnection details.
+ /// </value>
+ bool IsFaultTolerant
+ {
+ get;
+ }
+
+ /// <value>
+ /// Indiciates if the Transport is current Connected to is assigned URI.
+ /// </value>
+ bool IsConnected
+ {
+ get;
+ }
+
+ /// <value>
+ /// The Remote Address that this transport is currently connected to.
+ /// </value>
+ Uri RemoteAddress
+ {
+ get;
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransport.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.MQTT.Transport
+{
+ public delegate void SetTransport(ITransport transport, Uri uri);
+
+ public interface ITransportFactory
+ {
+ ITransport CreateTransport(Uri location);
+ ITransport CompositeConnect(Uri location);
+ ITransport CompositeConnect(Uri location, SetTransport setTransport);
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Transport/ITransportFactory.cs
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+ /// <summary>
+ /// A FIFO based MessageDispatchChannel.
+ /// </summary>
+ public class FifoMessageDispatchChannel : MessageDispatchChannel
+ {
+ private readonly Mutex mutex = new Mutex();
+ private bool closed;
+ private bool running;
+ private readonly LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
+
+ #region Properties
+
+ public object SyncRoot
+ {
+ get{ return this.mutex; }
+ }
+
+ public bool Closed
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.closed;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.closed = value;
+ }
+ }
+ }
+
+ public bool Running
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.running;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.running = value;
+ }
+ }
+ }
+
+ public bool Empty
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count == 0;
+ }
+ }
+ }
+
+ public long Count
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count;
+ }
+ }
+ }
+
+ #endregion
+
+ public void Start()
+ {
+ lock(this.mutex)
+ {
+ if(!Closed)
+ {
+ this.running = true;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ lock(mutex)
+ {
+ this.running = false;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Close()
+ {
+ lock(mutex)
+ {
+ if(!Closed)
+ {
+ this.running = false;
+ this.closed = true;
+ }
+
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Enqueue(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddLast(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public void EnqueueFirst(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddFirst(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public MessageDispatch Dequeue(TimeSpan timeout)
+ {
+ lock(this.mutex)
+ {
+ // Wait until the channel is ready to deliver messages.
+ if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
+ {
+ Monitor.Wait(this.mutex, timeout);
+ }
+
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return DequeueNoWait();
+ }
+ }
+
+ public MessageDispatch DequeueNoWait()
+ {
+ MessageDispatch result = null;
+
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ result = channel.First.Value;
+ this.channel.RemoveFirst();
+ }
+
+ return result;
+ }
+
+ public MessageDispatch Peek()
+ {
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return channel.First.Value;
+ }
+ }
+
+ public void Clear()
+ {
+ lock(mutex)
+ {
+ this.channel.Clear();
+ }
+ }
+
+ public MessageDispatch[] RemoveAll()
+ {
+ MessageDispatch[] result;
+
+ lock(mutex)
+ {
+ result = new MessageDispatch[this.Count];
+ channel.CopyTo(result, 0);
+ channel.Clear();
+ }
+
+ return result;
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.Util;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+ public class MQTTMessageTransformation : MessageTransformation
+ {
+ private readonly Connection connection;
+
+ public MQTTMessageTransformation(Connection connection) : base()
+ {
+ this.connection = connection;
+ }
+
+ #region Creation Methods and Conversion Support Methods
+
+ protected override IMessage DoCreateMessage()
+ {
+ throw new NotSupportedException("MQTT Cannot process Empty Messages");
+ }
+
+ protected override IBytesMessage DoCreateBytesMessage()
+ {
+ return null;
+ }
+
+ protected override ITextMessage DoCreateTextMessage()
+ {
+ throw new NotSupportedException("MQTT Cannot process Text Messages");
+ }
+
+ protected override IStreamMessage DoCreateStreamMessage()
+ {
+ throw new NotSupportedException("MQTT Cannot process Stream Messages");
+ }
+
+ protected override IMapMessage DoCreateMapMessage()
+ {
+ throw new NotSupportedException("MQTT Cannot process Map Messages");
+ }
+
+ protected override IObjectMessage DoCreateObjectMessage()
+ {
+ throw new NotSupportedException("MQTT Cannot process Object Messages");
+ }
+
+ protected override IDestination DoTransformDestination(IDestination destination)
+ {
+ return MQTTDestination.Transform(destination);
+ }
+
+ protected override void DoPostProcessMessage(IMessage message)
+ {
+ }
+
+ #endregion
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MQTTMessageTransformation.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=1542115&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Thu Nov 14 23:18:56 2013
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.MQTT.Commands;
+
+namespace Apache.NMS.MQTT.Util
+{
+ /// <summary>
+ /// Defines an interface for a Message Channel used to dispatch incoming
+ /// Messages to a Session or MessageConsumer. The implementation controls
+ /// how the messages are dequeued from the channel, one option is for a
+ /// FIFO ordering while another might be to sort the Message's based on the
+ /// set Message Priority.
+ /// </summary>
+ public interface MessageDispatchChannel
+ {
+ object SyncRoot
+ {
+ get;
+ }
+
+ bool Closed
+ {
+ get;
+ set;
+ }
+
+ bool Running
+ {
+ get;
+ set;
+ }
+
+ bool Empty
+ {
+ get;
+ }
+
+ long Count
+ {
+ get;
+ }
+
+ void Start();
+
+ void Stop();
+
+ void Close();
+
+ void Enqueue(MessageDispatch dispatch);
+
+ void EnqueueFirst(MessageDispatch dispatch);
+
+ MessageDispatch Dequeue(TimeSpan timeout);
+
+ MessageDispatch DequeueNoWait();
+
+ MessageDispatch Peek();
+
+ void Clear();
+
+ MessageDispatch[] RemoveAll();
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native