You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/11/16 22:51:22 UTC
svn commit: r1542594 - in
/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp:
Connection.cs MessageConsumer.cs MessageProducer.cs Messages/MQTTMessage.cs
Session.cs SessionExecutor.cs Threads/DefaultThreadPools.cs
Threads/PooledTaskRunner.cs
Author: tabish
Date: Sat Nov 16 21:51:21 2013
New Revision: 1542594
URL: http://svn.apache.org/r1542594
Log:
https://issues.apache.org/jira/browse/AMQNET-458
Implementation
Added:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (with props)
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Connection.cs Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
private readonly Uri brokerUri;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
private readonly Atomic<bool> connected = new Atomic<bool>(false);
private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -324,14 +325,14 @@ namespace Apache.NMS.MQTT
// this.producers.Add(id, producer);
// }
// }
-//
-// internal void RemoveProducer(ProducerId id)
-// {
-// if(!this.closing.Value)
-// {
-// this.producers.Remove(id);
-// }
-// }
+
+ internal void RemoveProducer(int id)
+ {
+ if(!this.closing.Value)
+ {
+ this.producers.Remove(id);
+ }
+ }
internal void RemoveDispatcher(IDispatcher dispatcher)
{
@@ -388,6 +389,21 @@ namespace Apache.NMS.MQTT
{
}
+ internal void OnSessionException(Session sender, Exception exception)
+ {
+ if(ExceptionListener != null)
+ {
+ try
+ {
+ ExceptionListener(exception);
+ }
+ catch
+ {
+ sender.Close();
+ }
+ }
+ }
+
protected void CheckClosedOrFailed()
{
CheckClosed();
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageConsumer.cs Sat Nov 16 21:51:21 2013
@@ -37,6 +37,7 @@ namespace Apache.NMS.MQTT
private Exception failureError;
private ThreadPoolExecutor executor;
private int consumerId;
+ protected bool disposed = false;
private event MessageListener listener;
@@ -113,6 +114,71 @@ namespace Apache.NMS.MQTT
this.unconsumedMessages.Stop();
}
+ public virtual void Close()
+ {
+ if(!this.unconsumedMessages.Closed)
+ {
+ Tracer.DebugFormat("Consumer {0} closing normally.", this.ConsumerId);
+ this.DoClose();
+ }
+ }
+
+ internal void DoClose()
+ {
+ Shutdown();
+ //this.session.Connection.Oneway(removeCommand);
+ }
+
+ /// <summary>
+ /// Called from the parent Session of this Consumer to indicate that its
+ /// parent session is closing and this Consumer should close down but not
+ /// send any message to the Broker as the parent close will take care of
+ /// removing its child resources at the broker.
+ /// </summary>
+ internal void Shutdown()
+ {
+ if(!this.unconsumedMessages.Closed)
+ {
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
+ }
+
+ // Do we have any acks we need to send out before closing?
+ // Ack any delivered messages now.
+ if(!this.session.IsTransacted)
+ {
+ }
+
+ if (this.executor != null)
+ {
+ this.executor.Shutdown();
+ this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
+ this.executor = null;
+ }
+
+ if (this.session.IsClientAcknowledge)
+ {
+ }
+
+ if(!this.session.IsTransacted)
+ {
+ lock(this.dispatchedMessages)
+ {
+ dispatchedMessages.Clear();
+ }
+ }
+
+ this.session.RemoveConsumer(this);
+ this.unconsumedMessages.Close();
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
+ }
+ }
+ }
+
public bool Iterate()
{
if(this.listener != null)
@@ -128,6 +194,146 @@ namespace Apache.NMS.MQTT
return false;
}
+ public IMessage Receive()
+ {
+ CheckClosed();
+ CheckMessageListener();
+
+ MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+ if(dispatch == null)
+ {
+ return null;
+ }
+
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateMQTTMessage(dispatch);
+ }
+
+ public IMessage Receive(TimeSpan timeout)
+ {
+ CheckClosed();
+ CheckMessageListener();
+
+ MessageDispatch dispatch = null;
+ dispatch = this.Dequeue(timeout);
+
+ if(dispatch == null)
+ {
+ return null;
+ }
+
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateMQTTMessage(dispatch);
+ }
+
+ public IMessage ReceiveNoWait()
+ {
+ CheckClosed();
+ CheckMessageListener();
+
+ MessageDispatch dispatch = null;
+ dispatch = this.Dequeue(TimeSpan.Zero);
+
+ if(dispatch == null)
+ {
+ return null;
+ }
+
+ BeforeMessageIsConsumed(dispatch);
+ AfterMessageIsConsumed(dispatch, false);
+
+ return CreateMQTTMessage(dispatch);
+ }
+
+ public virtual void Dispatch(MessageDispatch dispatch)
+ {
+ }
+
+ /// <summary>
+ /// Used to get an enqueued message from the unconsumedMessages list. The
+ /// amount of time this method blocks is based on the timeout value. if
+ /// timeout == Timeout.Infinite then it blocks until a message is received.
+ /// if timeout == 0 then it it tries to not block at all, it returns a
+ /// message if it is available if timeout > 0 then it blocks up to timeout
+ /// amount of time. Expired messages will consumed by this method.
+ /// </summary>
+ /// <param name="timeout">
+ /// A <see cref="System.TimeSpan"/>
+ /// </param>
+ /// <returns>
+ /// A <see cref="MessageDispatch"/>
+ /// </returns>
+ private MessageDispatch Dequeue(TimeSpan timeout)
+ {
+ DateTime deadline = DateTime.Now;
+
+ if(timeout > TimeSpan.Zero)
+ {
+ deadline += timeout;
+ }
+
+ while(true)
+ {
+ MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
+
+ // Grab a single date/time for calculations to avoid timing errors.
+ DateTime dispatchTime = DateTime.Now;
+
+ if(dispatch == null)
+ {
+ if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+ {
+ if(dispatchTime > deadline)
+ {
+ // Out of time.
+ timeout = TimeSpan.Zero;
+ }
+ else
+ {
+ // Adjust the timeout to the remaining time.
+ timeout = deadline - dispatchTime;
+ }
+ }
+ else
+ {
+ // Informs the caller of an error in the event that an async exception
+ // took down the parent connection.
+ if(this.failureError != null)
+ {
+ throw NMSExceptionSupport.Create(this.failureError);
+ }
+
+ return null;
+ }
+ }
+ else if(dispatch.Message == null)
+ {
+ return null;
+ }
+ else
+ {
+ return dispatch;
+ }
+ }
+ }
+
+ public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
+ {
+ }
+
+ public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+ {
+ if(this.unconsumedMessages.Closed)
+ {
+ return;
+ }
+ }
+
private void CheckClosed()
{
if(this.unconsumedMessages.Closed)
@@ -140,7 +346,7 @@ namespace Apache.NMS.MQTT
{
if(this.listener != null)
{
- throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+ throw new NMSException("Cannot perform a Sync receive on a MessageConsumer that has an async listener");
}
}
@@ -159,11 +365,105 @@ namespace Apache.NMS.MQTT
get { return this.session.IsClientAcknowledge; }
}
+ private MQTTMessage CreateMQTTMessage(MessageDispatch dispatch)
+ {
+ MQTTMessage message = dispatch.Message.Clone() as MQTTMessage;
+
+ if(this.ConsumerTransformer != null)
+ {
+ IMessage newMessage = ConsumerTransformer(this.session, this, message);
+ if(newMessage != null)
+ {
+ message = this.messageTransformation.TransformMessage<MQTTMessage>(newMessage);
+ }
+ }
+
+ message.Connection = this.session.Connection;
+
+ if(IsClientAcknowledge)
+ {
+ message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+ }
+ else if(IsIndividualAcknowledge)
+ {
+ message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
+ }
+ else
+ {
+ message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+ }
+
+ return message;
+ }
+
+ protected void DoIndividualAcknowledge(MQTTMessage message)
+ {
+ MessageDispatch dispatch = null;
+
+ lock(this.dispatchedMessages)
+ {
+ foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
+ {
+ if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+ {
+ dispatch = originalDispatch;
+ this.dispatchedMessages.Remove(originalDispatch);
+ break;
+ }
+ }
+ }
+
+ if(dispatch == null)
+ {
+ Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
+ return;
+ }
+
+// MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
+// Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
+// this.session.SendAck(ack);
+ }
+
+ protected void DoNothingAcknowledge(MQTTMessage message)
+ {
+ }
+
+ protected void DoClientAcknowledge(MQTTMessage message)
+ {
+ this.CheckClosed();
+ Tracer.Debug("Sending Client Ack:");
+// this.session.Acknowledge();
+ }
+
internal bool Closed
{
get { return this.unconsumedMessages.Closed; }
}
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ protected void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ try
+ {
+ Close();
+ }
+ catch
+ {
+ // Ignore network errors.
+ }
+
+ disposed = true;
+ }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/MessageProducer.cs Sat Nov 16 21:51:21 2013
@@ -32,6 +32,7 @@ namespace Apache.NMS.MQTT
private TimeSpan requestTimeout;
protected bool disposed = false;
private int producerId;
+ private Topic destination;
private readonly MessageTransformation messageTransformation;
@@ -60,6 +61,41 @@ namespace Apache.NMS.MQTT
set { this.requestTimeout = value; }
}
+ public MsgDeliveryMode DeliveryMode
+ {
+ get { return msgDeliveryMode; }
+ set { this.msgDeliveryMode = value; }
+ }
+
+ public TimeSpan TimeToLive
+ {
+ get { return TimeSpan.MaxValue; }
+ set {}
+ }
+
+ public MsgPriority Priority
+ {
+ get { return MsgPriority.Normal; }
+ set {}
+ }
+
+ public bool DisableMessageID
+ {
+ get { return false; }
+ set {}
+ }
+
+ public bool DisableMessageTimestamp
+ {
+ get { return false; }
+ set {}
+ }
+
+ public Topic Destination
+ {
+ get { return this.destination; }
+ }
+
#endregion
public void Dispose()
@@ -119,7 +155,7 @@ namespace Apache.NMS.MQTT
try
{
- session.RemoveProducer(info.ProducerId);
+ session.RemoveProducer(ProducerId);
}
catch(Exception ex)
{
@@ -132,25 +168,25 @@ namespace Apache.NMS.MQTT
public void Send(IMessage message)
{
- Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ Send(Destination, message, this.msgDeliveryMode);
}
public void Send(IDestination destination, IMessage message)
{
- Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+ Send(destination, message, this.msgDeliveryMode);
}
public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+ Send(Destination, message, deliveryMode);
}
public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(destination, message, deliveryMode, priority, timeToLive, true);
+ Send(destination, message, deliveryMode);
}
- protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+ protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode)
{
}
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Messages/MQTTMessage.cs Sat Nov 16 21:51:21 2013
@@ -30,6 +30,7 @@ namespace Apache.NMS.MQTT.Messages
private PrimitiveMap properties;
private Connection connection;
private Topic destination;
+ private short messageId;
public event AcknowledgeHandler Acknowledger;
@@ -92,9 +93,6 @@ namespace Apache.NMS.MQTT.Messages
public virtual void ClearProperties()
{
- this.MarshalledProperties = null;
- this.ReadOnlyProperties = false;
- this.Properties.Clear();
}
protected void FailIfReadOnlyBody()
@@ -299,6 +297,12 @@ namespace Apache.NMS.MQTT.Messages
set { }
}
+ public int MessageId
+ {
+ get { return this.messageId; }
+ set { this.messageId = value; }
+ }
+
#endregion
public object GetObjectProperty(string name)
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Session.cs Sat Nov 16 21:51:21 2013
@@ -438,14 +438,14 @@ namespace Apache.NMS.MQTT
}
}
-// public void RemoveProducer(ProducerId objectId)
-// {
-// connection.RemoveProducer(objectId);
-// if(!this.closing)
-// {
-// producers.Remove(objectId);
-// }
-// }
+ public void RemoveProducer(int producerId)
+ {
+ connection.RemoveProducer(producerId);
+ if(!this.closing)
+ {
+ producers.Remove(producerId);
+ }
+ }
public void Dispatch(MessageDispatch dispatch)
{
Modified: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs?rev=1542594&r1=1542593&r2=1542594&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/SessionExecutor.cs Sat Nov 16 21:51:21 2013
@@ -35,15 +35,7 @@ namespace Apache.NMS.MQTT
{
this.session = session;
this.consumers = consumers;
-
- if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
- {
- this.messageQueue = new SimplePriorityMessageDispatchChannel();
- }
- else
- {
- this.messageQueue = new FifoMessageDispatchChannel();
- }
+ this.messageQueue = new FifoMessageDispatchChannel();
}
~SessionExecutor()
@@ -146,10 +138,10 @@ namespace Apache.NMS.MQTT
lock(this.consumers.SyncRoot)
{
- if(this.consumers.Contains(dispatch.ConsumerId))
- {
- consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
- }
+// if(this.consumers.Contains(dispatch.ConsumerId))
+// {
+// consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+// }
}
// If the consumer is not available, just ignore the message.
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=1542594&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs Sat Nov 16 21:51:21 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MQTT.Threads
+{
+ public class DefaultThreadPools
+ {
+ private static readonly TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
+
+ private DefaultThreadPools()
+ {
+ }
+
+ public static TaskRunnerFactory DefaultTaskRunnerFactory
+ {
+ get { return taskRunnerFactory; }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=1542594&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Sat Nov 16 21:51:21 2013
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.MQTT.Threads
+{
+ class PooledTaskRunner : TaskRunner
+ {
+ private readonly int maxIterationsPerRun;
+ private readonly Task task;
+ private readonly Object runable = new Object();
+ private bool queued;
+ private bool _shutdown;
+ private bool iterating;
+ private volatile System.Threading.Thread runningThread;
+
+ public void Run(Object o)
+ {
+ PooledTaskRunner p = o as PooledTaskRunner;
+ p.runningThread = System.Threading.Thread.CurrentThread;
+ try
+ {
+ p.RunTask();
+ }
+ finally
+ {
+ p.runningThread = null;
+ }
+ }
+
+ public PooledTaskRunner(Task task, int maxIterationsPerRun)
+ {
+ this.maxIterationsPerRun = maxIterationsPerRun;
+ this.task = task;
+ this._shutdown = false;
+ this.iterating = false;
+ this.queued = true;
+ ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+ }
+
+ /// <summary>
+ /// We Expect MANY wakeup calls on the same TaskRunner.
+ /// </summary>
+ public void Wakeup()
+ {
+ lock(runable)
+ {
+ // When we get in here, we make some assumptions of state:
+ // queued=false, iterating=false: wakeup() has not be called and
+ // therefore task is not executing.
+ // queued=true, iterating=false: wakeup() was called but, task
+ // execution has not started yet
+ // queued=false, iterating=true : wakeup() was called, which caused
+ // task execution to start.
+ // queued=true, iterating=true : wakeup() called after task
+ // execution was started.
+
+ if(queued || _shutdown)
+ {
+ return;
+ }
+
+ queued = true;
+
+ // The runTask() method will do this for me once we are done
+ // iterating.
+ if(!iterating)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+ }
+ }
+ }
+
+ /// <summary>
+ /// shut down the task
+ /// </summary>
+ /// <param name="timeout"></param>
+ public void Shutdown(TimeSpan timeout)
+ {
+ lock(runable)
+ {
+ _shutdown = true;
+ // the check on the thread is done
+ // because a call to iterate can result in
+ // shutDown() being called, which would wait forever
+ // waiting for iterating to finish
+ if(runningThread != System.Threading.Thread.CurrentThread)
+ {
+ if(iterating)
+ {
+ System.Threading.Thread.Sleep(timeout);
+ }
+ }
+ }
+ }
+
+ public void ShutdownWithAbort(TimeSpan timeout)
+ {
+ lock(runable)
+ {
+ _shutdown = true;
+
+ if(runningThread != System.Threading.Thread.CurrentThread)
+ {
+ if(iterating)
+ {
+ System.Threading.Thread.Sleep(timeout);
+ }
+
+ if(iterating)
+ {
+ runningThread.Abort();
+ }
+ }
+ }
+ }
+
+ public void Shutdown()
+ {
+ Shutdown(new TimeSpan(Timeout.Infinite));
+ }
+
+ internal void RunTask()
+ {
+ lock(runable)
+ {
+ queued = false;
+ if(_shutdown)
+ {
+ iterating = false;
+ return;
+ }
+ iterating = true;
+ }
+
+ // Don't synchronize while we are iterating so that
+ // multiple wakeup() calls can be executed concurrently.
+ bool done = false;
+ try
+ {
+ for(int i = 0; i < maxIterationsPerRun; i++)
+ {
+ if(!task.Iterate())
+ {
+ done = true;
+ break;
+ }
+ }
+ }
+ finally
+ {
+ lock(runable)
+ {
+ iterating = false;
+ if(_shutdown)
+ {
+ queued = false;
+ }
+ else
+ {
+ // If we could not iterate all the items
+ // then we need to re-queue.
+ if(!done)
+ {
+ queued = true;
+ }
+
+ if(queued)
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
+ }
+ }
+ }
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.MQTT/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
------------------------------------------------------------------------------
svn:eol-style = native