You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/06 23:29:44 UTC
[20/50] [abbrv] activemq-nms-stomp git commit: Merge all fixes from
trunk in preparation for a v1.5.1 release.
Merge all fixes from trunk in preparation for a v1.5.1 release.
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/c0368061
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/c0368061
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/c0368061
Branch: refs/heads/1.5.x
Commit: c0368061e8b6bfa688406f27b2d55026832bdc51
Parents: 4d2a816
Author: Timothy A. Bish <ta...@apache.org>
Authored: Wed Jan 26 21:25:30 2011 +0000
Committer: Timothy A. Bish <ta...@apache.org>
Committed: Wed Jan 26 21:25:30 2011 +0000
----------------------------------------------------------------------
src/main/csharp/Connection.cs | 145 ++++++++++---
src/main/csharp/Threads/ThreadPoolExecutor.cs | 167 +++++++++++++++
src/main/csharp/Transport/InactivityMonitor.cs | 49 +++--
src/main/csharp/Util/MessageDispatchChannel.cs | 21 +-
.../csharp/Threads/ThreadPoolExecutorTest.cs | 202 +++++++++++++++++++
vs2008-stomp-test.csproj | 1 +
vs2008-stomp.csproj | 1 +
7 files changed, 534 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 6ce7003..ffaf180 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -20,6 +20,7 @@ using System.Collections;
using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Threads;
using Apache.NMS.Stomp.Transport;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;
@@ -52,9 +53,11 @@ namespace Apache.NMS.Stomp
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
- private bool connected = false;
- private bool closed = false;
- private bool closing = false;
+ private readonly Atomic<bool> connected = new Atomic<bool>(false);
+ private readonly Atomic<bool> closed = new Atomic<bool>(false);
+ private readonly Atomic<bool> closing = new Atomic<bool>(false);
+ private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+ private Exception firstFailureError = null;
private int sessionCounter = 0;
private int temporaryDestinationCounter = 0;
private int localTransactionCounter;
@@ -64,6 +67,7 @@ namespace Apache.NMS.Stomp
private readonly IdGenerator clientIdGenerator;
private CountDownLatch transportInterruptionProcessingComplete;
private readonly MessageTransformation messageTransformation;
+ private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -72,7 +76,7 @@ namespace Apache.NMS.Stomp
this.transport = transport;
this.transport.Command = new CommandHandler(OnCommand);
- this.transport.Exception = new ExceptionHandler(OnException);
+ this.transport.Exception = new ExceptionHandler(OnTransportException);
this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
this.transport.Resumed = new ResumedHandler(OnTransportResumed);
@@ -215,6 +219,16 @@ namespace Apache.NMS.Stomp
set { this.transport = value; }
}
+ public bool TransportFailed
+ {
+ get { return this.transportFailed.Value; }
+ }
+
+ public Exception FirstFailureError
+ {
+ get { return this.firstFailureError; }
+ }
+
public TimeSpan RequestTimeout
{
get { return this.requestTimeout; }
@@ -232,7 +246,7 @@ namespace Apache.NMS.Stomp
get { return info.ClientId; }
set
{
- if(this.connected)
+ if(this.connected.Value)
{
throw new NMSException("You cannot change the ClientId once the Connection is connected");
}
@@ -384,7 +398,7 @@ namespace Apache.NMS.Stomp
internal void RemoveSession(Session session)
{
- if(!this.closing)
+ if(!this.closing.Value)
{
sessions.Remove(session);
}
@@ -404,7 +418,7 @@ namespace Apache.NMS.Stomp
{
lock(myLock)
{
- if(this.closed)
+ if(this.closed.Value)
{
return;
}
@@ -412,7 +426,7 @@ namespace Apache.NMS.Stomp
try
{
Tracer.Info("Closing Connection.");
- this.closing = true;
+ this.closing.Value = true;
lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
@@ -422,7 +436,7 @@ namespace Apache.NMS.Stomp
}
sessions.Clear();
- if(connected)
+ if(connected.Value)
{
ShutdownInfo shutdowninfo = new ShutdownInfo();
transport.Oneway(shutdowninfo);
@@ -438,9 +452,9 @@ namespace Apache.NMS.Stomp
finally
{
this.transport = null;
- this.closed = true;
- this.connected = false;
- this.closing = false;
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
}
}
}
@@ -534,24 +548,24 @@ namespace Apache.NMS.Stomp
protected void CheckConnected()
{
- if(closed)
+ if(closed.Value)
{
throw new ConnectionClosedException();
}
- if(!connected)
+ if(!connected.Value)
{
if(!this.userSpecifiedClientID)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
}
- connected = true;
+ connected.Value = true;
// now lets send the connection and see if we get an ack/nak
if(null == SyncRequest(info))
{
- closed = true;
- connected = false;
+ closed.Value = true;
+ connected.Value = false;
throw new ConnectionClosedException();
}
}
@@ -581,7 +595,7 @@ namespace Apache.NMS.Stomp
}
else if(command.IsErrorCommand)
{
- if(!closing && !closed)
+ if(!closing.Value && !closed.Value)
{
ConnectionError connectionError = (ConnectionError) command;
BrokerError brokerError = connectionError.Exception;
@@ -597,7 +611,7 @@ namespace Apache.NMS.Stomp
}
}
- OnException(commandTransport, new NMSConnectionException(message, cause));
+ OnException(new NMSConnectionException(message, cause));
}
}
else
@@ -632,17 +646,85 @@ namespace Apache.NMS.Stomp
Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
}
- protected void OnException(ITransport sender, Exception exception)
+ protected void OnTransportException(ITransport sender, Exception exception)
+ {
+ this.OnException(exception);
+ }
+
+ internal void OnAsyncException(Exception error)
+ {
+ if(!this.closed.Value && !this.closing.Value)
+ {
+ if(this.ExceptionListener != null)
+ {
+ if(!(error is NMSException))
+ {
+ error = NMSExceptionSupport.Create(error);
+ }
+ NMSException e = (NMSException)error;
+
+ // Called in another thread so that processing can continue
+ // here, ensures no lock contention.
+ executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
+ }
+ else
+ {
+ Tracer.Debug("Async exception with no exception listener: " + error);
+ }
+ }
+ }
+
+ private void AsyncCallExceptionListener(object error)
+ {
+ NMSException exception = error as NMSException;
+ this.ExceptionListener(exception);
+ }
+
+ internal void OnException(Exception error)
+ {
+ // Will fire an exception listener callback if there's any set.
+ OnAsyncException(error);
+
+ if(!this.closing.Value && !this.closed.Value)
+ {
+ // Perform the actual work in another thread to avoid lock contention
+ // and allow the caller to continue on in its error cleanup.
+ executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+ }
+ }
+
+ private void AsyncOnExceptionHandler(object error)
{
- if(ExceptionListener != null && !this.closing)
+ Exception cause = error as Exception;
+
+ MarkTransportFailed(cause);
+
+ try
+ {
+ this.transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
+ }
+
+ IList sessionsCopy = null;
+ lock(this.sessions.SyncRoot)
+ {
+ sessionsCopy = new ArrayList(this.sessions);
+ }
+
+ // Use a copy so we don't concurrently modify the Sessions list if the
+ // client is closing at the same time.
+ foreach(Session session in sessionsCopy)
{
try
{
- ExceptionListener(exception);
+ session.Dispose();
}
- catch
+ catch(Exception ex)
{
- sender.Dispose();
+ Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
}
}
}
@@ -662,7 +744,7 @@ namespace Apache.NMS.Stomp
session.ClearMessagesInProgress();
}
- if(this.ConnectionInterruptedListener != null && !this.closing)
+ if(this.ConnectionInterruptedListener != null && !this.closing.Value)
{
try
{
@@ -678,7 +760,7 @@ namespace Apache.NMS.Stomp
{
Tracer.Debug("Transport has resumed normal operation.");
- if(this.ConnectionResumedListener != null && !this.closing)
+ if(this.ConnectionResumedListener != null && !this.closing.Value)
{
try
{
@@ -705,6 +787,15 @@ namespace Apache.NMS.Stomp
}
}
+ private void MarkTransportFailed(Exception error)
+ {
+ this.transportFailed.Value = true;
+ if(this.firstFailureError == null)
+ {
+ this.firstFailureError = error;
+ }
+ }
+
/// <summary>
/// Creates a new temporary destination name
/// </summary>
@@ -739,7 +830,7 @@ namespace Apache.NMS.Stomp
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if(cdl != null)
{
- if(!closed && cdl.Remaining > 0)
+ if(!closed.Value && cdl.Remaining > 0)
{
Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
"processing (" + cdl.Remaining + ") to complete..");
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Threads/ThreadPoolExecutor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Threads/ThreadPoolExecutor.cs b/src/main/csharp/Threads/ThreadPoolExecutor.cs
new file mode 100644
index 0000000..7072dfe
--- /dev/null
+++ b/src/main/csharp/Threads/ThreadPoolExecutor.cs
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+ /// <summary>
+ /// This class provides a wrapper around the ThreadPool mechanism in .NET
+ /// to allow for serial execution of jobs in the ThreadPool and provide
+ /// a means of shutting down the execution of jobs in a deterministic
+ /// way.
+ /// </summary>
+ public class ThreadPoolExecutor
+ {
+ private Queue<Future> workQueue = new Queue<Future>();
+ private Mutex syncRoot = new Mutex();
+ private bool running = false;
+ private bool closing = false;
+ private bool closed = false;
+ private ManualResetEvent executionComplete = new ManualResetEvent(true);
+
+ /// <summary>
+ /// Represents an asynchronous task that is executed on the ThreadPool
+ /// at some point in the future.
+ /// </summary>
+ internal class Future
+ {
+ private WaitCallback callback;
+ private object callbackArg;
+
+ public Future(WaitCallback callback, object arg)
+ {
+ this.callback = callback;
+ this.callbackArg = arg;
+ }
+
+ public void Run()
+ {
+ if(this.callback == null)
+ {
+ throw new Exception("Future executed with null WaitCallback");
+ }
+
+ try
+ {
+ this.callback(callbackArg);
+ }
+ catch
+ {
+ }
+ }
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker)
+ {
+ this.QueueUserWorkItem(worker, null);
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker, object arg)
+ {
+ if(worker == null)
+ {
+ throw new ArgumentNullException("Invalid WaitCallback passed");
+ }
+
+ if(!this.closed)
+ {
+ lock(syncRoot)
+ {
+ if(!this.closed || !this.closing)
+ {
+ this.workQueue.Enqueue(new Future(worker, arg));
+
+ if(!this.running)
+ {
+ this.executionComplete.Reset();
+ this.running = true;
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+ }
+
+ public bool IsShutdown
+ {
+ get { return this.closed; }
+ }
+
+ public void Shutdown()
+ {
+ if(!this.closed)
+ {
+ syncRoot.WaitOne();
+
+ if(!this.closed)
+ {
+ this.closing = true;
+ this.workQueue.Clear();
+
+ if(this.running)
+ {
+ syncRoot.ReleaseMutex();
+ this.executionComplete.WaitOne();
+ syncRoot.WaitOne();
+ }
+
+ this.closed = true;
+ }
+
+ syncRoot.ReleaseMutex();
+ }
+ }
+
+ private void QueueProcessor(object unused)
+ {
+ Future theTask = null;
+
+ lock(syncRoot)
+ {
+ if(this.workQueue.Count == 0 || this.closing)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ return;
+ }
+
+ theTask = this.workQueue.Dequeue();
+ }
+
+ try
+ {
+ theTask.Run();
+ }
+ finally
+ {
+ if(this.closing)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ }
+ else
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Transport/InactivityMonitor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/InactivityMonitor.cs b/src/main/csharp/Transport/InactivityMonitor.cs
index 8fb5c95..1614394 100644
--- a/src/main/csharp/Transport/InactivityMonitor.cs
+++ b/src/main/csharp/Transport/InactivityMonitor.cs
@@ -45,6 +45,9 @@ namespace Apache.NMS.Stomp.Transport
private AsyncWriteTask asyncWriteTask;
private readonly Mutex monitor = new Mutex();
+
+ private static int id = 0;
+ private readonly int instanceId = 0;
private bool disposing = false;
private Timer connectionCheckTimer;
@@ -83,7 +86,8 @@ namespace Apache.NMS.Stomp.Transport
public InactivityMonitor(ITransport next)
: base(next)
{
- Tracer.Debug("Creating Inactivity Monitor");
+ this.instanceId = ++id;
+ Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
}
~InactivityMonitor()
@@ -98,8 +102,14 @@ namespace Apache.NMS.Stomp.Transport
// get rid of unmanaged stuff
}
- this.disposing = true;
- StopMonitorThreads();
+ lock(monitor)
+ {
+ this.localWireFormatInfo = null;
+ this.remoteWireFormatInfo = null;
+ this.disposing = true;
+ StopMonitorThreads();
+ }
+
base.Dispose(disposing);
}
@@ -123,19 +133,19 @@ namespace Apache.NMS.Stomp.Transport
{
if(this.inWrite.Value || this.failed.Value)
{
- Tracer.Debug("Inactivity Monitor is in write or already failed.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
return;
}
if(!commandSent.Value)
{
- //Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
- Tracer.Debug("Message sent since last write check. Resetting flag");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
}
commandSent.Value = false;
@@ -150,6 +160,7 @@ namespace Apache.NMS.Stomp.Transport
if(!AllowReadCheck(elapsed))
{
+ Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
return;
}
@@ -157,12 +168,13 @@ namespace Apache.NMS.Stomp.Transport
if(this.inRead.Value || this.failed.Value || this.asyncErrorTask == null)
{
+ Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
return;
}
if(!commandReceived.Value)
{
- Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
this.asyncErrorTask.IsPending = true;
this.asyncTasks.Wakeup();
}
@@ -215,9 +227,9 @@ namespace Apache.NMS.Stomp.Transport
{
if(Tracer.IsDebugEnabled)
{
- Tracer.Debug("InactivityMonitor: New Keep Alive Received at -> " +
- DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
- "." + DateTime.Now.Millisecond);
+ Tracer.DebugFormat("InactivityMonitor[{0}]: New Keep Alive Received at -> " +
+ DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
+ "." + DateTime.Now.Millisecond, instanceId);
}
}
@@ -325,27 +337,29 @@ namespace Apache.NMS.Stomp.Transport
initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay;
- Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime );
- Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime );
- Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+ instanceId, readCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+ instanceId, initialDelayTime );
this.asyncTasks = new CompositeTaskRunner();
if(this.asyncErrorTask != null)
{
- Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Adding the Async Read Check Task to the Runner.", instanceId);
this.asyncTasks.AddTask(this.asyncErrorTask);
}
if(this.asyncWriteTask != null)
{
- Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
+ instanceId, writeCheckTime );
this.asyncTasks.AddTask(this.asyncWriteTask);
}
if(this.asyncErrorTask != null || this.asyncWriteTask != null)
{
- Tracer.Debug("Inactivity: Starting the Monitor Timer.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Starting the Monitor Timer.", instanceId);
monitorStarted.Value = true;
this.connectionCheckTimer = new Timer(
@@ -427,10 +441,13 @@ namespace Apache.NMS.Stomp.Transport
public bool Iterate()
{
+ Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
+ Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
+ parent.instanceId);
KeepAliveInfo info = new KeepAliveInfo();
this.parent.next.Oneway(info);
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/main/csharp/Util/MessageDispatchChannel.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/MessageDispatchChannel.cs b/src/main/csharp/Util/MessageDispatchChannel.cs
index a1d7cfc..64bd7aa 100644
--- a/src/main/csharp/Util/MessageDispatchChannel.cs
+++ b/src/main/csharp/Util/MessageDispatchChannel.cs
@@ -25,16 +25,13 @@ namespace Apache.NMS.Stomp.Util
public class MessageDispatchChannel
{
private readonly Mutex mutex = new Mutex();
- private readonly ManualResetEvent wakeAll = new ManualResetEvent(false);
- private readonly AutoResetEvent waiter = new AutoResetEvent(false);
- private WaitHandle[] waiters;
+ private readonly ManualResetEvent waiter = new ManualResetEvent(false);
private bool closed;
private bool running;
private readonly LinkedList<MessageDispatch> channel = new LinkedList<MessageDispatch>();
public MessageDispatchChannel()
{
- this.waiters = new WaitHandle[] { this.waiter, this.wakeAll };
}
#region Properties
@@ -113,7 +110,7 @@ namespace Apache.NMS.Stomp.Util
if(!Closed)
{
this.running = true;
- this.wakeAll.Reset();
+ this.waiter.Reset();
}
}
}
@@ -123,7 +120,7 @@ namespace Apache.NMS.Stomp.Util
lock(mutex)
{
this.running = false;
- this.wakeAll.Set();
+ this.waiter.Set();
}
}
@@ -137,7 +134,7 @@ namespace Apache.NMS.Stomp.Util
this.closed = true;
}
- this.wakeAll.Set();
+ this.waiter.Set();
}
}
@@ -168,9 +165,15 @@ namespace Apache.NMS.Stomp.Util
// Wait until the channel is ready to deliver messages.
if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running ) )
{
- this.mutex.ReleaseMutex();
+ // This isn't the greatest way to do this but to work on the
+ // .NETCF its the only solution I could find so far. This
+ // code will only really work for one Thread using the event
+ // channel to wait as all waiters are going to drop out of
+ // here regardless of the fact that only one message could
+ // be on the Queue.
this.waiter.Reset();
- ThreadUtil.WaitAny(this.waiters, (int)timeout.TotalMilliseconds, false);
+ this.mutex.ReleaseMutex();
+ this.waiter.WaitOne((int)timeout.TotalMilliseconds, false);
this.mutex.WaitOne();
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Threads/ThreadPoolExecutorTest.cs b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
new file mode 100644
index 0000000..1da7241
--- /dev/null
+++ b/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.Stomp.Threads;
+using NUnit.Framework;
+
+namespace Apache.NMS.Stomp.Test
+{
+ [TestFixture]
+ public class ThreadPoolExecutorTest
+ {
+ private const int JOB_COUNT = 100;
+ private ManualResetEvent complete = new ManualResetEvent(false);
+ private bool waitingTaskCompleted = false;
+ private CountDownLatch doneLatch;
+ private int count = 0;
+
+ internal class DummyClass
+ {
+ private int data;
+
+ public DummyClass(int data)
+ {
+ this.data = data;
+ }
+
+ public int Data
+ {
+ get { return data; }
+ }
+ }
+
+ public ThreadPoolExecutorTest()
+ {
+ }
+
+ private void TaskThatSignalsWhenItsComplete(object unused)
+ {
+ waitingTaskCompleted = true;
+ complete.Set();
+ }
+
+ private void TaskThatCountsDown(object unused)
+ {
+ doneLatch.countDown();
+ }
+
+ private void TaskThatSleeps(object unused)
+ {
+ Thread.Sleep(5000);
+ }
+
+ private void TaskThatIncrementsCount(object unused)
+ {
+ count++;
+ }
+
+ private void TaskThatThrowsAnException(object unused)
+ {
+ throw new Exception("Throwing an Exception just because");
+ }
+
+ private void TaskThatValidatesTheArg(object arg)
+ {
+ DummyClass state = arg as DummyClass;
+ if(state != null && state.Data == 10 )
+ {
+ waitingTaskCompleted = true;
+ }
+ complete.Set();
+ }
+
+ [SetUp]
+ public void SetUp()
+ {
+ this.complete.Reset();
+ this.waitingTaskCompleted = false;
+ this.doneLatch = new CountDownLatch(JOB_COUNT);
+ this.count = 0;
+ }
+
+ [Test]
+ public void TestConstructor()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestSingleTaskExecuted()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatSignalsWhenItsComplete);
+
+ this.complete.WaitOne();
+ Assert.IsTrue(this.waitingTaskCompleted);
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestTaskParamIsPropagated()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatValidatesTheArg, new DummyClass(10));
+
+ this.complete.WaitOne();
+ Assert.IsTrue(this.waitingTaskCompleted);
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestAllTasksComplete()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatCountsDown);
+ }
+
+ Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestAllTasksCompleteAfterException()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatThrowsAnException);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatCountsDown);
+ }
+
+ Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestThatShutdownPurgesTasks()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatSleeps);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatIncrementsCount);
+ }
+
+ Thread.Sleep(100);
+
+ executor.Shutdown();
+ Assert.AreEqual(0, count);
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp-test.csproj
----------------------------------------------------------------------
diff --git a/vs2008-stomp-test.csproj b/vs2008-stomp-test.csproj
index b093a81..ffb03ae 100644
--- a/vs2008-stomp-test.csproj
+++ b/vs2008-stomp-test.csproj
@@ -101,6 +101,7 @@
<Compile Include="src\test\csharp\StompHelperTest.cs" />
<Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />
<Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />
+ <Compile Include="src\test\csharp\Threads\ThreadPoolExecutorTest.cs" />
<Compile Include="src\test\csharp\Util\MessageDispatchChannelTest.cs" />
<Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
<Compile Include="src\test\csharp\StompTopicTransactionTest.cs" />
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/c0368061/vs2008-stomp.csproj
----------------------------------------------------------------------
diff --git a/vs2008-stomp.csproj b/vs2008-stomp.csproj
index 0b332d4..6921324 100644
--- a/vs2008-stomp.csproj
+++ b/vs2008-stomp.csproj
@@ -110,6 +110,7 @@
<Compile Include="src\main\csharp\RequestTimedOutException.cs" />
<Compile Include="src\main\csharp\Threads\CompositeTask.cs" />
<Compile Include="src\main\csharp\Threads\CompositeTaskRunner.cs" />
+ <Compile Include="src\main\csharp\Threads\ThreadPoolExecutor.cs" />
<Compile Include="src\main\csharp\Transport\Tcp\TcpTransport.cs" />
<Compile Include="src\main\csharp\Transport\Tcp\TcpTransportFactory.cs" />
<Compile Include="src\main\csharp\Transport\FutureResponse.cs" />