You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/16 22:51:22 UTC

svn commit: r1542594 - in /activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp: Connection.cs MessageConsumer.cs MessageProducer.cs Messages/MQTTMessage.cs Session.cs SessionExecutor.cs Threads/DefaultThreadPools.cs Threads/PooledTaskRunner.cs

Author: tabish
Date: Sat Nov 16 21:51:21 2013
New Revision: 1542594

URL: http://svn.apache.org/r1542594
Log:
https://issues.apache.org/jira/browse/AMQNET-458

Implementation

Added:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
 		private readonly Uri brokerUri;
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
         private readonly Atomic<bool> connected = new Atomic<bool>(false);
         private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -324,14 +325,14 @@ namespace Apache.NMS.MQTT
 //				this.producers.Add(id, producer);
 //			}
 //		}
-//
-//		internal void RemoveProducer(ProducerId id)
-//		{
-//			if(!this.closing.Value)
-//			{
-//				this.producers.Remove(id);
-//			}
-//		}
+
+		internal void RemoveProducer(int id)
+		{
+			if(!this.closing.Value)
+			{
+				this.producers.Remove(id);
+			}
+		}
 
 	    internal void RemoveDispatcher(IDispatcher dispatcher) 
 		{
@@ -388,6 +389,21 @@ namespace Apache.NMS.MQTT
 		{
 		}
 
+		internal void OnSessionException(Session sender, Exception exception)
+		{
+			if(ExceptionListener != null)
+			{
+				try
+				{
+					ExceptionListener(exception);
+				}
+				catch
+				{
+					sender.Close();
+				}
+			}
+		}
+
 		protected void CheckClosedOrFailed()
 		{
 			CheckClosed();

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
         private Exception failureError;
 		private ThreadPoolExecutor executor;
 		private int consumerId;
+		protected bool disposed = false;
 
 		private event MessageListener listener;
 
@@ -113,6 +114,71 @@ namespace Apache.NMS.MQTT
 			this.unconsumedMessages.Stop();
 		}
 
+		public virtual void Close()
+		{
+			if(!this.unconsumedMessages.Closed)
+			{
+                Tracer.DebugFormat("Consumer {0} closing normally.", this.ConsumerId);
+                this.DoClose();
+			}
+		}
+
+		internal void DoClose()
+		{
+	        Shutdown();
+	        //this.session.Connection.Oneway(removeCommand);
+		}
+
+		/// <summary>
+		/// Called from the parent Session of this Consumer to indicate that its
+		/// parent session is closing and this Consumer should close down but not
+		/// send any message to the Broker as the parent close will take care of
+		/// removing its child resources at the broker.
+		/// </summary>
+		internal void Shutdown()
+		{
+			if(!this.unconsumedMessages.Closed)
+			{
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
+				}
+
+				// Do we have any acks we need to send out before closing?
+				// Ack any delivered messages now.
+				if(!this.session.IsTransacted)
+				{
+				}
+
+	            if (this.executor != null) 
+				{
+					this.executor.Shutdown();
+					this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+					this.executor = null;
+	            }
+
+	            if (this.session.IsClientAcknowledge)
+				{
+	            }
+
+				if(!this.session.IsTransacted)
+				{
+					lock(this.dispatchedMessages)
+					{
+						dispatchedMessages.Clear();
+					}
+				}
+
+				this.session.RemoveConsumer(this);
+				this.unconsumedMessages.Close();
+
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
+				}
+			}			
+		}
+
 		public bool Iterate()
 		{
 			if(this.listener != null)
@@ -128,6 +194,146 @@ namespace Apache.NMS.MQTT
 			return false;
 		}
 
+		public IMessage Receive()
+		{
+			CheckClosed();
+			CheckMessageListener();
+
+			MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateMQTTMessage(dispatch);
+		}
+
+		public IMessage Receive(TimeSpan timeout)
+		{
+			CheckClosed();
+			CheckMessageListener();
+
+			MessageDispatch dispatch = null;
+			dispatch = this.Dequeue(timeout);
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateMQTTMessage(dispatch);
+		}
+
+		public IMessage ReceiveNoWait()
+		{
+			CheckClosed();
+			CheckMessageListener();
+
+			MessageDispatch dispatch = null;
+			dispatch = this.Dequeue(TimeSpan.Zero);
+
+			if(dispatch == null)
+			{
+				return null;
+			}
+
+			BeforeMessageIsConsumed(dispatch);
+			AfterMessageIsConsumed(dispatch, false);
+
+			return CreateMQTTMessage(dispatch);
+		}
+
+		public virtual void Dispatch(MessageDispatch dispatch)
+		{
+		}
+
+		/// <summary>
+		/// Used to get an enqueued message from the unconsumedMessages list. The
+		/// amount of time this method blocks is based on the timeout value.  if
+		/// timeout == Timeout.Infinite then it blocks until a message is received.
+		/// if timeout == 0 then it it tries to not block at all, it returns a
+		/// message if it is available if timeout > 0 then it blocks up to timeout
+		/// amount of time.  Expired messages will consumed by this method.
+		/// </summary>
+		/// <param name="timeout">
+		/// A <see cref="System.TimeSpan"/>
+		/// </param>
+		/// <returns>
+		/// A <see cref="MessageDispatch"/>
+		/// </returns>
+		private MessageDispatch Dequeue(TimeSpan timeout)
+		{
+			DateTime deadline = DateTime.Now;
+
+			if(timeout > TimeSpan.Zero)
+			{
+				deadline += timeout;
+			}
+
+			while(true)
+			{
+				MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
+
+				// Grab a single date/time for calculations to avoid timing errors.
+				DateTime dispatchTime = DateTime.Now;
+
+				if(dispatch == null)
+				{
+					if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+					{
+						if(dispatchTime > deadline)
+						{
+							// Out of time.
+							timeout = TimeSpan.Zero;
+						}
+						else
+						{
+							// Adjust the timeout to the remaining time.
+							timeout = deadline - dispatchTime;
+						}
+					}
+					else
+					{
+                        // Informs the caller of an error in the event that an async exception
+                        // took down the parent connection.
+                        if(this.failureError != null)
+                        {
+                            throw NMSExceptionSupport.Create(this.failureError);
+                        }
+
+						return null;
+					}
+				}
+				else if(dispatch.Message == null)
+				{
+					return null;
+				}
+				else
+				{
+					return dispatch;
+				}
+			}
+		}
+
+		public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
+		{
+		}
+
+		public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+		{
+			if(this.unconsumedMessages.Closed)
+			{
+				return;
+			}
+		}
+
 		private void CheckClosed()
 		{
 			if(this.unconsumedMessages.Closed)
@@ -140,7 +346,7 @@ namespace Apache.NMS.MQTT
 		{
 			if(this.listener != null)
 			{
-				throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+				throw new NMSException("Cannot perform a Sync receive on a MessageConsumer that has an async listener");
 			}
 		}
 
@@ -159,11 +365,105 @@ namespace Apache.NMS.MQTT
 			get { return this.session.IsClientAcknowledge; }
 		}
 
+		private MQTTMessage CreateMQTTMessage(MessageDispatch dispatch)
+		{
+			MQTTMessage message = dispatch.Message.Clone() as MQTTMessage;
+
+			if(this.ConsumerTransformer != null)
+			{
+				IMessage newMessage = ConsumerTransformer(this.session, this, message);
+				if(newMessage != null)
+				{
+					message = this.messageTransformation.TransformMessage<MQTTMessage>(newMessage);
+				}
+			}
+
+			message.Connection = this.session.Connection;
+
+			if(IsClientAcknowledge)
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+			}
+			else if(IsIndividualAcknowledge)
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
+			}
+			else
+			{
+				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+			}
+
+			return message;
+		}
+
+		protected void DoIndividualAcknowledge(MQTTMessage message)
+		{
+			MessageDispatch dispatch = null;
+
+			lock(this.dispatchedMessages)
+			{
+				foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
+				{
+					if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+					{
+						dispatch = originalDispatch;
+						this.dispatchedMessages.Remove(originalDispatch);
+						break;
+					}
+				}
+			}
+
+			if(dispatch == null)
+			{
+				Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
+				return;
+			}
+
+//			MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+//			Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
+//			this.session.SendAck(ack);
+		}
+
+		protected void DoNothingAcknowledge(MQTTMessage message)
+		{
+		}
+
+		protected void DoClientAcknowledge(MQTTMessage message)
+		{
+			this.CheckClosed();
+			Tracer.Debug("Sending Client Ack:");
+//			this.session.Acknowledge();
+		}
+
 	    internal bool Closed
 	    {
             get { return this.unconsumedMessages.Closed; }
 	    }
 
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			disposed = true;
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Sat Nov 16 21:51:21 2013
@@ -32,6 +32,7 @@ namespace Apache.NMS.MQTT
 		private TimeSpan requestTimeout;
 		protected bool disposed = false;
 		private int producerId;
+		private Topic destination;
 
 		private readonly MessageTransformation messageTransformation;
 
@@ -60,6 +61,41 @@ namespace Apache.NMS.MQTT
 			set { this.requestTimeout = value; }
 		}
 
+		public MsgDeliveryMode DeliveryMode
+		{
+			get { return msgDeliveryMode; }
+			set { this.msgDeliveryMode = value; }
+		}
+
+		public TimeSpan TimeToLive
+		{
+			get { return TimeSpan.MaxValue; }
+			set {}
+		}
+
+		public MsgPriority Priority
+		{
+			get { return MsgPriority.Normal; }
+			set {}
+		}
+
+		public bool DisableMessageID
+		{
+			get { return false; }
+			set {}
+		}
+
+		public bool DisableMessageTimestamp
+		{
+			get { return false; }
+			set {}
+		}
+
+		public Topic Destination
+		{
+			get { return this.destination; }
+		}
+
 		#endregion
 
 		public void Dispose()
@@ -119,7 +155,7 @@ namespace Apache.NMS.MQTT
 
 				try
 				{
-					session.RemoveProducer(info.ProducerId);
+					session.RemoveProducer(ProducerId);
 				}
 				catch(Exception ex)
 				{
@@ -132,25 +168,25 @@ namespace Apache.NMS.MQTT
 
 		public void Send(IMessage message)
 		{
-			Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+			Send(Destination, message, this.msgDeliveryMode);
 		}
 
 		public void Send(IDestination destination, IMessage message)
 		{
-			Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+			Send(destination, message, this.msgDeliveryMode);
 		}
 
 		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
-			Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+			Send(Destination, message, deliveryMode);
 		}
 
 		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
-			Send(destination, message, deliveryMode, priority, timeToLive, true);
+			Send(destination, message, deliveryMode);
 		}
 
-		protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+		protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode)
 		{
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Sat Nov 16 21:51:21 2013
@@ -30,6 +30,7 @@ namespace Apache.NMS.MQTT.Messages
 		private PrimitiveMap properties;
 		private Connection connection;
 		private Topic destination;
+		private short messageId;
 
 		public event AcknowledgeHandler Acknowledger;
 
@@ -92,9 +93,6 @@ namespace Apache.NMS.MQTT.Messages
 
 		public virtual void ClearProperties()
 		{
-			this.MarshalledProperties = null;
-			this.ReadOnlyProperties = false;
-			this.Properties.Clear();
 		}
 
 		protected void FailIfReadOnlyBody()
@@ -299,6 +297,12 @@ namespace Apache.NMS.MQTT.Messages
 			set {  }
 		}
 
+		public int MessageId
+		{
+			get { return this.messageId; }
+			set { this.messageId = value; }
+		}
+
 		#endregion
 
 		public object GetObjectProperty(string name)

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Sat Nov 16 21:51:21 2013
@@ -438,14 +438,14 @@ namespace Apache.NMS.MQTT
             }
         }
 
-//        public void RemoveProducer(ProducerId objectId)
-//        {
-//            connection.RemoveProducer(objectId);
-//            if(!this.closing)
-//            {
-//                producers.Remove(objectId);
-//            }
-//        }
+        public void RemoveProducer(int producerId)
+        {
+            connection.RemoveProducer(producerId);
+            if(!this.closing)
+            {
+                producers.Remove(producerId);
+            }
+        }
 
         public void Dispatch(MessageDispatch dispatch)
         {

Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Sat Nov 16 21:51:21 2013
@@ -35,15 +35,7 @@ namespace Apache.NMS.MQTT
         {
             this.session = session;
             this.consumers = consumers;
-
-            if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
-            {
-               this.messageQueue = new SimplePriorityMessageDispatchChannel();
-            }
-            else
-            {
-                this.messageQueue = new FifoMessageDispatchChannel();
-            }
+            this.messageQueue = new FifoMessageDispatchChannel();
         }
 
         ~SessionExecutor()
@@ -146,10 +138,10 @@ namespace Apache.NMS.MQTT
 
                 lock(this.consumers.SyncRoot)
                 {
-                    if(this.consumers.Contains(dispatch.ConsumerId))
-                    {
-                        consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
-                    }
+//                    if(this.consumers.Contains(dispatch.ConsumerId))
+//                    {
+//                        consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+//                    }
                 }
 				
                 // If the consumer is not available, just ignore the message.

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=1542594&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs Sat Nov 16 21:51:21 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MQTT.Threads
+{
+	public class DefaultThreadPools
+	{
+		private static readonly TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
+
+		private DefaultThreadPools()
+		{
+		}
+
+		public static TaskRunnerFactory DefaultTaskRunnerFactory
+		{
+			get { return taskRunnerFactory; }
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=1542594&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Sat Nov 16 21:51:21 2013
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+	class PooledTaskRunner : TaskRunner
+	{
+		private readonly int maxIterationsPerRun;
+		private readonly Task task;
+		private readonly Object runable = new Object();
+		private bool queued;
+		private bool _shutdown;
+		private bool iterating;
+		private volatile System.Threading.Thread runningThread;
+
+		public void Run(Object o)
+		{
+			PooledTaskRunner p = o as PooledTaskRunner;
+			p.runningThread = System.Threading.Thread.CurrentThread;
+			try
+			{
+				p.RunTask();
+			}
+			finally
+			{
+				p.runningThread = null;
+			}
+		}
+
+		public PooledTaskRunner(Task task, int maxIterationsPerRun)
+		{
+			this.maxIterationsPerRun = maxIterationsPerRun;
+			this.task = task;
+			this._shutdown = false;
+			this.iterating = false;
+			this.queued = true;
+			ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+		}
+
+		/// <summary>
+		/// We Expect MANY wakeup calls on the same TaskRunner.
+		/// </summary>
+		public void Wakeup()
+		{
+			lock(runable)
+			{
+				// When we get in here, we make some assumptions of state:
+				// queued=false, iterating=false: wakeup() has not be called and
+				// therefore task is not executing.
+				// queued=true, iterating=false: wakeup() was called but, task
+				// execution has not started yet
+				// queued=false, iterating=true : wakeup() was called, which caused
+				// task execution to start.
+				// queued=true, iterating=true : wakeup() called after task
+				// execution was started.
+
+				if(queued || _shutdown)
+				{
+					return;
+				}
+
+				queued = true;
+
+				// The runTask() method will do this for me once we are done
+				// iterating.
+				if(!iterating)
+				{
+					ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+				}
+			}
+		}
+
+		/// <summary>
+		/// shut down the task
+		/// </summary>
+		/// <param name="timeout"></param>
+		public void Shutdown(TimeSpan timeout)
+		{
+			lock(runable)
+			{
+				_shutdown = true;
+				// the check on the thread is done
+				// because a call to iterate can result in
+				// shutDown() being called, which would wait forever
+				// waiting for iterating to finish
+				if(runningThread != System.Threading.Thread.CurrentThread)
+				{
+					if(iterating)
+					{
+						System.Threading.Thread.Sleep(timeout);
+					}
+				}
+			}
+		}
+
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(runable)
+            {
+                _shutdown = true;
+
+                if(runningThread != System.Threading.Thread.CurrentThread)
+                {
+                    if(iterating)
+                    {
+                        System.Threading.Thread.Sleep(timeout);
+                    }
+
+                    if(iterating)
+                    {
+                        runningThread.Abort();
+                    }
+                }
+            }
+        }
+
+		public void Shutdown()
+		{
+			Shutdown(new TimeSpan(Timeout.Infinite));
+		}
+
+		internal void RunTask()
+		{
+			lock(runable)
+			{
+				queued = false;
+				if(_shutdown)
+				{
+					iterating = false;
+					return;
+				}
+				iterating = true;
+			}
+
+			// Don't synchronize while we are iterating so that
+			// multiple wakeup() calls can be executed concurrently.
+			bool done = false;
+			try
+			{
+				for(int i = 0; i < maxIterationsPerRun; i++)
+				{
+					if(!task.Iterate())
+					{
+						done = true;
+						break;
+					}
+				}
+			}
+			finally
+			{
+				lock(runable)
+				{
+					iterating = false;
+					if(_shutdown)
+					{
+						queued = false;
+					}
+					else
+					{
+						// If we could not iterate all the items
+						// then we need to re-queue.
+						if(!done)
+						{
+							queued = true;
+						}
+
+						if(queued)
+						{
+							ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+						}
+					}
+				}
+			}
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
------------------------------------------------------------------------------
    svn:eol-style = native