You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/06 23:29:44 UTC

[20/50] [abbrv] activemq-nms-stomp git commit: Merge all fixes from trunk in preparation for a v1.5.1 release.

Merge all fixes from trunk in preparation for a v1.5.1 release.


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/c0368061
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/c0368061
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/c0368061

Branch: refs/heads/1.5.x
Commit: c0368061e8b6bfa688406f27b2d55026832bdc51
Parents: 4d2a816
Author: Timothy A. Bish <ta...@apache.org>
Authored: Wed Jan 26 21:25:30 2011 +0000
Committer: Timothy A. Bish <ta...@apache.org>
Committed: Wed Jan 26 21:25:30 2011 +0000

----------------------------------------------------------------------
 src/main/csharp/Connection.cs                   | 145 ++++++++++---
 src/main/csharp/Threads/ThreadPoolExecutor.cs   | 167 +++++++++++++++
 src/main/csharp/Transport/InactivityMonitor.cs  |  49 +++--
 src/main/csharp/Util/MessageDispatchChannel.cs  |  21 +-
 .../csharp/Threads/ThreadPoolExecutorTest.cs    | 202 +++++++++++++++++++
 vs2008-stomp-test.csproj                        |   1 +
 vs2008-stomp.csproj                             |   1 +
 7 files changed, 534 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 6ce7003..ffaf180 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -20,6 +20,7 @@ using System.Collections;
 using System.Collections.Specialized;
 using System.Threading;
 using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Threads;
 using Apache.NMS.Stomp.Transport;
 using Apache.NMS.Stomp.Util;
 using Apache.NMS.Util;
@@ -52,9 +53,11 @@ namespace Apache.NMS.Stomp
         private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
-        private bool connected = false;
-        private bool closed = false;
-        private bool closing = false;
+        private readonly Atomic<bool> connected = new Atomic<bool>(false);
+        private readonly Atomic<bool> closed = new Atomic<bool>(false);
+        private readonly Atomic<bool> closing = new Atomic<bool>(false);
+        private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+        private Exception firstFailureError = null;
         private int sessionCounter = 0;
         private int temporaryDestinationCounter = 0;
         private int localTransactionCounter;
@@ -64,6 +67,7 @@ namespace Apache.NMS.Stomp
         private readonly IdGenerator clientIdGenerator;
         private CountDownLatch transportInterruptionProcessingComplete;
         private readonly MessageTransformation messageTransformation;
+        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
@@ -72,7 +76,7 @@ namespace Apache.NMS.Stomp
 
             this.transport = transport;
             this.transport.Command = new CommandHandler(OnCommand);
-            this.transport.Exception = new ExceptionHandler(OnException);
+            this.transport.Exception = new ExceptionHandler(OnTransportException);
             this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
             this.transport.Resumed = new ResumedHandler(OnTransportResumed);
 
@@ -215,6 +219,16 @@ namespace Apache.NMS.Stomp
             set { this.transport = value; }
         }
 
+        public bool TransportFailed
+        {
+            get { return this.transportFailed.Value; }
+        }
+
+        public Exception FirstFailureError
+        {
+            get { return this.firstFailureError; }
+        }
+
         public TimeSpan RequestTimeout
         {
             get { return this.requestTimeout; }
@@ -232,7 +246,7 @@ namespace Apache.NMS.Stomp
             get { return info.ClientId; }
             set
             {
-                if(this.connected)
+                if(this.connected.Value)
                 {
                     throw new NMSException("You cannot change the ClientId once the Connection is connected");
                 }
@@ -384,7 +398,7 @@ namespace Apache.NMS.Stomp
 
         internal void RemoveSession(Session session)
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 sessions.Remove(session);
             }
@@ -404,7 +418,7 @@ namespace Apache.NMS.Stomp
         {
             lock(myLock)
             {
-                if(this.closed)
+                if(this.closed.Value)
                 {
                     return;
                 }
@@ -412,7 +426,7 @@ namespace Apache.NMS.Stomp
                 try
                 {
                     Tracer.Info("Closing Connection.");
-                    this.closing = true;
+                    this.closing.Value = true;
                     lock(sessions.SyncRoot)
                     {
                         foreach(Session session in sessions)
@@ -422,7 +436,7 @@ namespace Apache.NMS.Stomp
                     }
                     sessions.Clear();
 
-                    if(connected)
+                    if(connected.Value)
                     {
                         ShutdownInfo shutdowninfo = new ShutdownInfo();
                         transport.Oneway(shutdowninfo);
@@ -438,9 +452,9 @@ namespace Apache.NMS.Stomp
                 finally
                 {
                     this.transport = null;
-                    this.closed = true;
-                    this.connected = false;
-                    this.closing = false;
+                    this.closed.Value = true;
+                    this.connected.Value = false;
+                    this.closing.Value = false;
                 }
             }
         }
@@ -534,24 +548,24 @@ namespace Apache.NMS.Stomp
 
         protected void CheckConnected()
         {
-            if(closed)
+            if(closed.Value)
             {
                 throw new ConnectionClosedException();
             }
 
-            if(!connected)
+            if(!connected.Value)
             {
                 if(!this.userSpecifiedClientID)
                 {
                     this.info.ClientId = this.clientIdGenerator.GenerateId();
                 }
 
-                connected = true;
+                connected.Value = true;
                 // now lets send the connection and see if we get an ack/nak
                 if(null == SyncRequest(info))
                 {
-                    closed = true;
-                    connected = false;
+                    closed.Value = true;
+                    connected.Value = false;
                     throw new ConnectionClosedException();
                 }
             }
@@ -581,7 +595,7 @@ namespace Apache.NMS.Stomp
             }
             else if(command.IsErrorCommand)
             {
-                if(!closing && !closed)
+                if(!closing.Value && !closed.Value)
                 {
                     ConnectionError connectionError = (ConnectionError) command;
                     BrokerError brokerError = connectionError.Exception;
@@ -597,7 +611,7 @@ namespace Apache.NMS.Stomp
                         }
                     }
 
-                    OnException(commandTransport, new NMSConnectionException(message, cause));
+                    OnException(new NMSConnectionException(message, cause));
                 }
             }
             else
@@ -632,17 +646,85 @@ namespace Apache.NMS.Stomp
             Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
         }
 
-        protected void OnException(ITransport sender, Exception exception)
+        protected void OnTransportException(ITransport sender, Exception exception)
+        {
+            this.OnException(exception);
+        }
+
+        internal void OnAsyncException(Exception error)
+        {
+            if(!this.closed.Value && !this.closing.Value)
+            {
+                if(this.ExceptionListener != null)
+                {
+                    if(!(error is NMSException))
+                    {
+                        error = NMSExceptionSupport.Create(error);
+                    }
+                    NMSException e = (NMSException)error;
+
+                    // Called in another thread so that processing can continue
+                    // here, ensures no lock contention.
+                    executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
+                }
+                else
+                {
+                    Tracer.Debug("Async exception with no exception listener: " + error);
+                }
+            }
+        }
+
+        private void AsyncCallExceptionListener(object error)
+        {
+            NMSException exception = error as NMSException;
+            this.ExceptionListener(exception);
+        }
+
+        internal void OnException(Exception error)
+        {
+            // Will fire an exception listener callback if there's any set.
+            OnAsyncException(error);
+
+            if(!this.closing.Value && !this.closed.Value)
+            {
+                // Perform the actual work in another thread to avoid lock contention
+                // and allow the caller to continue on in its error cleanup.
+                executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+            }
+        }
+
+        private void AsyncOnExceptionHandler(object error)
         {
-            if(ExceptionListener != null && !this.closing)
+            Exception cause = error as Exception;
+
+            MarkTransportFailed(cause);
+
+            try
+            {
+                this.transport.Dispose();
+            }
+            catch(Exception ex)
+            {
+                Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
+            }
+
+            IList sessionsCopy = null;
+            lock(this.sessions.SyncRoot)
+            {
+                sessionsCopy = new ArrayList(this.sessions);
+            }
+
+            // Use a copy so we don't concurrently modify the Sessions list if the
+            // client is closing at the same time.
+            foreach(Session session in sessionsCopy)
             {
                 try
                 {
-                    ExceptionListener(exception);
+                    session.Dispose();
                 }
-                catch
+                catch(Exception ex)
                 {
-                    sender.Dispose();
+                    Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
                 }
             }
         }
@@ -662,7 +744,7 @@ namespace Apache.NMS.Stomp
                 session.ClearMessagesInProgress();
             }
 
-            if(this.ConnectionInterruptedListener != null && !this.closing)
+            if(this.ConnectionInterruptedListener != null && !this.closing.Value)
             {
                 try
                 {
@@ -678,7 +760,7 @@ namespace Apache.NMS.Stomp
         {
             Tracer.Debug("Transport has resumed normal operation.");
 
-            if(this.ConnectionResumedListener != null && !this.closing)
+            if(this.ConnectionResumedListener != null && !this.closing.Value)
             {
                 try
                 {
@@ -705,6 +787,15 @@ namespace Apache.NMS.Stomp
             }
         }
 
+        private void MarkTransportFailed(Exception error)
+        {
+            this.transportFailed.Value = true;
+            if(this.firstFailureError == null)
+            {
+                this.firstFailureError = error;
+            }
+        }
+
         /// <summary>
         /// Creates a new temporary destination name
         /// </summary>
@@ -739,7 +830,7 @@ namespace Apache.NMS.Stomp
             CountDownLatch cdl = this.transportInterruptionProcessingComplete;
             if(cdl != null)
             {
-                if(!closed && cdl.Remaining > 0)
+                if(!closed.Value && cdl.Remaining > 0)
                 {
                     Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
                                 "processing (" + cdl.Remaining + ") to complete..");

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Threads/ThreadPoolExecutor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Threads/ThreadPoolExecutor.cs b/src/main/csharp/Threads/ThreadPoolExecutor.cs
new file mode 100644
index 0000000..7072dfe
--- /dev/null
+++ b/src/main/csharp/Threads/ThreadPoolExecutor.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+    /// <summary>
+    /// This class provides a wrapper around the ThreadPool mechanism in .NET
+    /// to allow for serial execution of jobs in the ThreadPool and provide
+    /// a means of shutting down the execution of jobs in a deterministic
+    /// way.
+    /// </summary>
+    public class ThreadPoolExecutor
+    {
+        private Queue<Future> workQueue = new Queue<Future>();
+        private Mutex syncRoot = new Mutex();
+        private bool running = false;
+        private bool closing = false;
+        private bool closed = false;
+        private ManualResetEvent executionComplete = new ManualResetEvent(true);
+
+        /// <summary>
+        /// Represents an asynchronous task that is executed on the ThreadPool
+        /// at some point in the future.
+        /// </summary>
+        internal class Future
+        {
+            private WaitCallback callback;
+            private object callbackArg;
+
+            public Future(WaitCallback callback, object arg)
+            {
+                this.callback = callback;
+                this.callbackArg = arg;
+            }
+
+            public void Run()
+            {
+                if(this.callback == null)
+                {
+                    throw new Exception("Future executed with null WaitCallback");
+                }
+
+                try
+                {
+                    this.callback(callbackArg);
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker)
+        {
+            this.QueueUserWorkItem(worker, null);
+        }
+
+        public void QueueUserWorkItem(WaitCallback worker, object arg)
+        {
+            if(worker == null)
+            {
+                throw new ArgumentNullException("Invalid WaitCallback passed");
+            }
+
+            if(!this.closed)
+            {
+                lock(syncRoot)
+                {
+                    if(!this.closed || !this.closing)
+                    {
+                        this.workQueue.Enqueue(new Future(worker, arg));
+
+                        if(!this.running)
+                        {
+                            this.executionComplete.Reset();
+                            this.running = true;
+                            ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+                        }
+                    }
+                }
+            }
+        }
+
+        public bool IsShutdown
+        {
+            get { return this.closed; }
+        }
+
+        public void Shutdown()
+        {
+            if(!this.closed)
+            {
+                syncRoot.WaitOne();
+
+                if(!this.closed)
+                {
+                    this.closing = true;
+                    this.workQueue.Clear();
+
+                    if(this.running)
+                    {
+                        syncRoot.ReleaseMutex();
+                        this.executionComplete.WaitOne();
+                        syncRoot.WaitOne();
+                    }
+
+                    this.closed = true;
+                }
+
+                syncRoot.ReleaseMutex();
+            }
+        }
+
+        private void QueueProcessor(object unused)
+        {
+            Future theTask = null;
+
+            lock(syncRoot)
+            {
+                if(this.workQueue.Count == 0 || this.closing)
+                {
+                    this.running = false;
+                    this.executionComplete.Set();
+                    return;
+                }
+
+                theTask = this.workQueue.Dequeue();
+            }
+
+            try
+            {
+                theTask.Run();
+            }
+            finally
+            {
+                if(this.closing)
+                {
+                    this.running = false;
+                    this.executionComplete.Set();
+                }
+                else
+                {
+                    ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Transport/InactivityMonitor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/InactivityMonitor.cs b/src/main/csharp/Transport/InactivityMonitor.cs
index 8fb5c95..1614394 100644
--- a/src/main/csharp/Transport/InactivityMonitor.cs
+++ b/src/main/csharp/Transport/InactivityMonitor.cs
@@ -45,6 +45,9 @@ namespace Apache.NMS.Stomp.Transport
         private AsyncWriteTask asyncWriteTask;
 
         private readonly Mutex monitor = new Mutex();
+
+        private static int id = 0;
+        private readonly int instanceId = 0;
         private bool disposing = false;
 
         private Timer connectionCheckTimer;
@@ -83,7 +86,8 @@ namespace Apache.NMS.Stomp.Transport
         public InactivityMonitor(ITransport next)
             : base(next)
         {
-            Tracer.Debug("Creating Inactivity Monitor");
+            this.instanceId = ++id;
+            Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
         }
 
         ~InactivityMonitor()
@@ -98,8 +102,14 @@ namespace Apache.NMS.Stomp.Transport
                 // get rid of unmanaged stuff
             }
 
-            this.disposing = true;
-            StopMonitorThreads();
+            lock(monitor)
+            {
+                this.localWireFormatInfo = null;
+                this.remoteWireFormatInfo = null;
+                this.disposing = true;
+                StopMonitorThreads();
+            }
+
             base.Dispose(disposing);
         }
 
@@ -123,19 +133,19 @@ namespace Apache.NMS.Stomp.Transport
         {
             if(this.inWrite.Value || this.failed.Value)
             {
-                Tracer.Debug("Inactivity Monitor is in write or already failed.");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
                 return;
             }
 
             if(!commandSent.Value)
             {
-                //Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
                 this.asyncWriteTask.IsPending = true;
                 this.asyncTasks.Wakeup();
             }
             else
             {
-                Tracer.Debug("Message sent since last write check. Resetting flag");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
             }
 
             commandSent.Value = false;
@@ -150,6 +160,7 @@ namespace Apache.NMS.Stomp.Transport
 
             if(!AllowReadCheck(elapsed))
             {
+                Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
                 return;
             }
 
@@ -157,12 +168,13 @@ namespace Apache.NMS.Stomp.Transport
 
             if(this.inRead.Value || this.failed.Value || this.asyncErrorTask == null)
             {
+                Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
                 return;
             }
 
             if(!commandReceived.Value)
             {
-                Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
                 this.asyncErrorTask.IsPending = true;
                 this.asyncTasks.Wakeup();
             }
@@ -215,9 +227,9 @@ namespace Apache.NMS.Stomp.Transport
                 {
                     if(Tracer.IsDebugEnabled)
                     {
-                        Tracer.Debug("InactivityMonitor: New Keep Alive Received at -> " +
-                                     DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
-                                     "." + DateTime.Now.Millisecond);
+                        Tracer.DebugFormat("InactivityMonitor[{0}]: New Keep Alive Received at -> " +
+                                           DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
+                                           "." + DateTime.Now.Millisecond, instanceId);
                     }
                 }
 
@@ -325,27 +337,29 @@ namespace Apache.NMS.Stomp.Transport
 
                 initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay;
 
-                Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime );
-                Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime );
-                Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime );
+                Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+                                   instanceId, readCheckTime );
+                Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+                                   instanceId, initialDelayTime );
 
                 this.asyncTasks = new CompositeTaskRunner();
 
                 if(this.asyncErrorTask != null)
                 {
-                    Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
+                    Tracer.DebugFormat("InactivityMonitor[{0}]: Adding the Async Read Check Task to the Runner.", instanceId);
                     this.asyncTasks.AddTask(this.asyncErrorTask);
                 }
 
                 if(this.asyncWriteTask != null)
                 {
-                    Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
+                    Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
+                                       instanceId, writeCheckTime );
                     this.asyncTasks.AddTask(this.asyncWriteTask);
                 }
 
                 if(this.asyncErrorTask != null || this.asyncWriteTask != null)
                 {
-                    Tracer.Debug("Inactivity: Starting the Monitor Timer.");
+                    Tracer.DebugFormat("InactivityMonitor[{0}]: Starting the Monitor Timer.", instanceId);
                     monitorStarted.Value = true;
 
                     this.connectionCheckTimer = new Timer(
@@ -427,10 +441,13 @@ namespace Apache.NMS.Stomp.Transport
 
             public bool Iterate()
             {
+                Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
                 if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
                 {
                     try
                     {
+                        Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
+                                           parent.instanceId);
                         KeepAliveInfo info = new KeepAliveInfo();
                         this.parent.next.Oneway(info);
                     }

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Util/MessageDispatchChannel.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/MessageDispatchChannel.cs b/src/main/csharp/Util/MessageDispatchChannel.cs
index a1d7cfc..64bd7aa 100644
--- a/src/main/csharp/Util/MessageDispatchChannel.cs
+++ b/src/main/csharp/Util/MessageDispatchChannel.cs
@@ -25,16 +25,13 @@ namespace Apache.NMS.Stomp.Util
     public class MessageDispatchChannel
     {
         private readonly Mutex mutex = new Mutex();
-        private readonly ManualResetEvent wakeAll = new ManualResetEvent(false);
-        private readonly AutoResetEvent waiter = new AutoResetEvent(false);
-        private WaitHandle[] waiters;
+        private readonly ManualResetEvent waiter = new ManualResetEvent(false);
         private bool closed;
         private bool running;
         private readonly LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
 
         public MessageDispatchChannel()
         {
-            this.waiters = new WaitHandle[] { this.waiter, this.wakeAll };
         }
 
         #region Properties
@@ -113,7 +110,7 @@ namespace Apache.NMS.Stomp.Util
                 if(!Closed)
                 {
                     this.running = true;
-                    this.wakeAll.Reset();
+                    this.waiter.Reset();
                 }
             }
         }
@@ -123,7 +120,7 @@ namespace Apache.NMS.Stomp.Util
             lock(mutex)
             {
                 this.running = false;
-                this.wakeAll.Set();
+                this.waiter.Set();
             }
         }
 
@@ -137,7 +134,7 @@ namespace Apache.NMS.Stomp.Util
                     this.closed = true;
                 }
 
-                this.wakeAll.Set();
+                this.waiter.Set();
             }
         }
 
@@ -168,9 +165,15 @@ namespace Apache.NMS.Stomp.Util
             // Wait until the channel is ready to deliver messages.
             if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
             {
-                this.mutex.ReleaseMutex();
+                // This isn't the greatest way to do this but to work on the
+                // .NETCF its the only solution I could find so far.  This
+                // code will only really work for one Thread using the event
+                // channel to wait as all waiters are going to drop out of
+                // here regardless of the fact that only one message could
+                // be on the Queue.  
                 this.waiter.Reset();
-                ThreadUtil.WaitAny(this.waiters, (int)timeout.TotalMilliseconds, false);
+                this.mutex.ReleaseMutex();
+                this.waiter.WaitOne((int)timeout.TotalMilliseconds, false);
                 this.mutex.WaitOne();
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Threads/ThreadPoolExecutorTest.cs b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
new file mode 100644
index 0000000..1da7241
--- /dev/null
+++ b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.Stomp.Threads;
+using NUnit.Framework;
+
+namespace Apache.NMS.Stomp.Test
+{
+    [TestFixture]
+    public class ThreadPoolExecutorTest
+    {
+        private const int JOB_COUNT = 100;
+        private ManualResetEvent complete = new ManualResetEvent(false);
+        private bool waitingTaskCompleted = false;
+        private CountDownLatch doneLatch;
+        private int count = 0;
+
+        internal class DummyClass
+        {
+            private int data;
+
+            public DummyClass(int data)
+            {
+                this.data = data;
+            }
+
+            public int Data
+            {
+                get { return data; }
+            }
+        }
+
+        public ThreadPoolExecutorTest()
+        {
+        }
+
+        private void TaskThatSignalsWhenItsComplete(object unused)
+        {
+            waitingTaskCompleted = true;
+            complete.Set();
+        }
+
+        private void TaskThatCountsDown(object unused)
+        {
+            doneLatch.countDown();
+        }
+
+        private void TaskThatSleeps(object unused)
+        {
+            Thread.Sleep(5000);
+        }
+
+        private void TaskThatIncrementsCount(object unused)
+        {
+            count++;
+        }
+
+        private void TaskThatThrowsAnException(object unused)
+        {
+            throw new Exception("Throwing an Exception just because");
+        }
+
+        private void TaskThatValidatesTheArg(object arg)
+        {
+            DummyClass state = arg as DummyClass;
+            if(state != null && state.Data == 10 )
+            {
+                waitingTaskCompleted = true;
+            }
+            complete.Set();
+        }
+
+        [SetUp]
+        public void SetUp()
+        {
+            this.complete.Reset();
+            this.waitingTaskCompleted = false;
+            this.doneLatch = new CountDownLatch(JOB_COUNT);
+            this.count = 0;
+        }
+
+        [Test]
+        public void TestConstructor()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestSingleTaskExecuted()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatSignalsWhenItsComplete);
+
+            this.complete.WaitOne();
+            Assert.IsTrue(this.waitingTaskCompleted);
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestTaskParamIsPropagated()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatValidatesTheArg, new DummyClass(10));
+
+            this.complete.WaitOne();
+            Assert.IsTrue(this.waitingTaskCompleted);
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestAllTasksComplete()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatCountsDown);
+            }
+
+            Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestAllTasksCompleteAfterException()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatThrowsAnException);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatCountsDown);
+            }
+
+            Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+            executor.Shutdown();
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+        [Test]
+        public void TestThatShutdownPurgesTasks()
+        {
+            ThreadPoolExecutor executor = new ThreadPoolExecutor();
+            Assert.IsNotNull(executor);
+            Assert.IsFalse(executor.IsShutdown);
+
+            executor.QueueUserWorkItem(TaskThatSleeps);
+
+            for(int i = 0; i < JOB_COUNT; ++i)
+            {
+                executor.QueueUserWorkItem(TaskThatIncrementsCount);
+            }
+
+            Thread.Sleep(100);
+
+            executor.Shutdown();
+            Assert.AreEqual(0, count);
+            Assert.IsTrue(executor.IsShutdown);
+        }
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp-test.csproj
----------------------------------------------------------------------
diff --git a/vs2008-stomp-test.csproj b/vs2008-stomp-test.csproj
index b093a81..ffb03ae 100644
--- a/vs2008-stomp-test.csproj
+++ b/vs2008-stomp-test.csproj
@@ -101,6 +101,7 @@
     <Compile Include="src\test\csharp\StompHelperTest.cs" />
     <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />
     <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />
+    <Compile Include="src\test\csharp\Threads\ThreadPoolExecutorTest.cs" />
     <Compile Include="src\test\csharp\Util\MessageDispatchChannelTest.cs" />
     <Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
     <Compile Include="src\test\csharp\StompTopicTransactionTest.cs" />

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp.csproj
----------------------------------------------------------------------
diff --git a/vs2008-stomp.csproj b/vs2008-stomp.csproj
index 0b332d4..6921324 100644
--- a/vs2008-stomp.csproj
+++ b/vs2008-stomp.csproj
@@ -110,6 +110,7 @@
     <Compile Include="src\main\csharp\RequestTimedOutException.cs" />
     <Compile Include="src\main\csharp\Threads\CompositeTask.cs" />
     <Compile Include="src\main\csharp\Threads\CompositeTaskRunner.cs" />
+    <Compile Include="src\main\csharp\Threads\ThreadPoolExecutor.cs" />
     <Compile Include="src\main\csharp\Transport\Tcp\TcpTransport.cs" />
     <Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs" />
     <Compile Include="src\main\csharp\Transport\FutureResponse.cs" />