You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/01/26 20:19:28 UTC
svn commit: r1063838 - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src:
main/csharp/Connection.cs main/csharp/Threads/ThreadPoolExecutor.cs
main/csharp/Transport/InactivityMonitor.cs
test/csharp/Threads/ThreadPoolExecutorTest.cs
Author: tabish
Date: Wed Jan 26 19:19:27 2011
New Revision: 1063838
URL: http://svn.apache.org/viewvc?rev=1063838&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-312
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (with props)
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=1063838&r1=1063837&r2=1063838&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Wed Jan 26 19:19:27 2011
@@ -20,6 +20,7 @@ using System.Collections;
using System.Collections.Specialized;
using System.Threading;
using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Threads;
using Apache.NMS.Stomp.Transport;
using Apache.NMS.Stomp.Util;
using Apache.NMS.Util;
@@ -52,9 +53,11 @@ namespace Apache.NMS.Stomp
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
- private bool connected = false;
- private bool closed = false;
- private bool closing = false;
+ private readonly Atomic<bool> connected = new Atomic<bool>(false);
+ private readonly Atomic<bool> closed = new Atomic<bool>(false);
+ private readonly Atomic<bool> closing = new Atomic<bool>(false);
+ private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+ private Exception firstFailureError = null;
private int sessionCounter = 0;
private int temporaryDestinationCounter = 0;
private int localTransactionCounter;
@@ -64,6 +67,7 @@ namespace Apache.NMS.Stomp
private readonly IdGenerator clientIdGenerator;
private CountDownLatch transportInterruptionProcessingComplete;
private readonly MessageTransformation messageTransformation;
+ private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
{
@@ -72,7 +76,7 @@ namespace Apache.NMS.Stomp
this.transport = transport;
this.transport.Command = new CommandHandler(OnCommand);
- this.transport.Exception = new ExceptionHandler(OnException);
+ this.transport.Exception = new ExceptionHandler(OnTransportException);
this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
this.transport.Resumed = new ResumedHandler(OnTransportResumed);
@@ -215,6 +219,16 @@ namespace Apache.NMS.Stomp
set { this.transport = value; }
}
+ public bool TransportFailed
+ {
+ get { return this.transportFailed.Value; }
+ }
+
+ public Exception FirstFailureError
+ {
+ get { return this.firstFailureError; }
+ }
+
public TimeSpan RequestTimeout
{
get { return this.requestTimeout; }
@@ -232,7 +246,7 @@ namespace Apache.NMS.Stomp
get { return info.ClientId; }
set
{
- if(this.connected)
+ if(this.connected.Value)
{
throw new NMSException("You cannot change the ClientId once the Connection is connected");
}
@@ -384,7 +398,7 @@ namespace Apache.NMS.Stomp
internal void RemoveSession(Session session)
{
- if(!this.closing)
+ if(!this.closing.Value)
{
sessions.Remove(session);
}
@@ -404,7 +418,7 @@ namespace Apache.NMS.Stomp
{
lock(myLock)
{
- if(this.closed)
+ if(this.closed.Value)
{
return;
}
@@ -412,7 +426,7 @@ namespace Apache.NMS.Stomp
try
{
Tracer.Info("Closing Connection.");
- this.closing = true;
+ this.closing.Value = true;
lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
@@ -422,7 +436,7 @@ namespace Apache.NMS.Stomp
}
sessions.Clear();
- if(connected)
+ if(connected.Value)
{
ShutdownInfo shutdowninfo = new ShutdownInfo();
transport.Oneway(shutdowninfo);
@@ -438,9 +452,9 @@ namespace Apache.NMS.Stomp
finally
{
this.transport = null;
- this.closed = true;
- this.connected = false;
- this.closing = false;
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
}
}
}
@@ -534,24 +548,24 @@ namespace Apache.NMS.Stomp
protected void CheckConnected()
{
- if(closed)
+ if(closed.Value)
{
throw new ConnectionClosedException();
}
- if(!connected)
+ if(!connected.Value)
{
if(!this.userSpecifiedClientID)
{
this.info.ClientId = this.clientIdGenerator.GenerateId();
}
- connected = true;
+ connected.Value = true;
// now lets send the connection and see if we get an ack/nak
if(null == SyncRequest(info))
{
- closed = true;
- connected = false;
+ closed.Value = true;
+ connected.Value = false;
throw new ConnectionClosedException();
}
}
@@ -581,7 +595,7 @@ namespace Apache.NMS.Stomp
}
else if(command.IsErrorCommand)
{
- if(!closing && !closed)
+ if(!closing.Value && !closed.Value)
{
ConnectionError connectionError = (ConnectionError) command;
BrokerError brokerError = connectionError.Exception;
@@ -597,7 +611,7 @@ namespace Apache.NMS.Stomp
}
}
- OnException(commandTransport, new NMSConnectionException(message, cause));
+ OnException(new NMSConnectionException(message, cause));
}
}
else
@@ -632,17 +646,85 @@ namespace Apache.NMS.Stomp
Tracer.Error("No such consumer active: " + dispatch.ConsumerId);
}
- protected void OnException(ITransport sender, Exception exception)
+ protected void OnTransportException(ITransport sender, Exception exception)
+ {
+ this.OnException(exception);
+ }
+
+ internal void OnAsyncException(Exception error)
+ {
+ if(!this.closed.Value && !this.closing.Value)
+ {
+ if(this.ExceptionListener != null)
+ {
+ if(!(error is NMSException))
+ {
+ error = NMSExceptionSupport.Create(error);
+ }
+ NMSException e = (NMSException)error;
+
+ // Called in another thread so that processing can continue
+ // here, ensures no lock contention.
+ executor.QueueUserWorkItem(AsyncCallExceptionListener, e);
+ }
+ else
+ {
+ Tracer.Debug("Async exception with no exception listener: " + error);
+ }
+ }
+ }
+
+ private void AsyncCallExceptionListener(object error)
+ {
+ NMSException exception = error as NMSException;
+ this.ExceptionListener(exception);
+ }
+
+ internal void OnException(Exception error)
+ {
+ // Will fire an exception listener callback if there's any set.
+ OnAsyncException(error);
+
+ if(!this.closing.Value && !this.closed.Value)
+ {
+ // Perform the actual work in another thread to avoid lock contention
+ // and allow the caller to continue on in its error cleanup.
+ executor.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+ }
+ }
+
+ private void AsyncOnExceptionHandler(object error)
{
- if(ExceptionListener != null && !this.closing)
+ Exception cause = error as Exception;
+
+ MarkTransportFailed(cause);
+
+ try
+ {
+ this.transport.Dispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
+ }
+
+ IList sessionsCopy = null;
+ lock(this.sessions.SyncRoot)
+ {
+ sessionsCopy = new ArrayList(this.sessions);
+ }
+
+ // Use a copy so we don't concurrently modify the Sessions list if the
+ // client is closing at the same time.
+ foreach(Session session in sessionsCopy)
{
try
{
- ExceptionListener(exception);
+ session.Dispose();
}
- catch
+ catch(Exception ex)
{
- sender.Dispose();
+ Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
}
}
}
@@ -662,7 +744,7 @@ namespace Apache.NMS.Stomp
session.ClearMessagesInProgress();
}
- if(this.ConnectionInterruptedListener != null && !this.closing)
+ if(this.ConnectionInterruptedListener != null && !this.closing.Value)
{
try
{
@@ -678,7 +760,7 @@ namespace Apache.NMS.Stomp
{
Tracer.Debug("Transport has resumed normal operation.");
- if(this.ConnectionResumedListener != null && !this.closing)
+ if(this.ConnectionResumedListener != null && !this.closing.Value)
{
try
{
@@ -705,6 +787,15 @@ namespace Apache.NMS.Stomp
}
}
+ private void MarkTransportFailed(Exception error)
+ {
+ this.transportFailed.Value = true;
+ if(this.firstFailureError == null)
+ {
+ this.firstFailureError = error;
+ }
+ }
+
/// <summary>
/// Creates a new temporary destination name
/// </summary>
@@ -739,7 +830,7 @@ namespace Apache.NMS.Stomp
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if(cdl != null)
{
- if(!closed && cdl.Remaining > 0)
+ if(!closed.Value && cdl.Remaining > 0)
{
Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption " +
"processing (" + cdl.Remaining + ") to complete..");
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs?rev=1063838&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs Wed Jan 26 19:19:27 2011
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.Stomp.Threads
+{
+ /// <summary>
+ /// This class provides a wrapper around the ThreadPool mechanism in .NET
+ /// to allow for serial execution of jobs in the ThreadPool and provide
+ /// a means of shutting down the execution of jobs in a deterministic
+ /// way.
+ /// </summary>
+ public class ThreadPoolExecutor
+ {
+ private Queue<Future> workQueue = new Queue<Future>();
+ private Mutex syncRoot = new Mutex();
+ private bool running = false;
+ private bool closing = false;
+ private bool closed = false;
+ private ManualResetEvent executionComplete = new ManualResetEvent(true);
+
+ /// <summary>
+ /// Represents an asynchronous task that is executed on the ThreadPool
+ /// at some point in the future.
+ /// </summary>
+ internal class Future
+ {
+ private WaitCallback callback;
+ private object callbackArg;
+
+ public Future(WaitCallback callback, object arg)
+ {
+ this.callback = callback;
+ this.callbackArg = arg;
+ }
+
+ public void Run()
+ {
+ if(this.callback == null)
+ {
+ throw new Exception("Future executed with null WaitCallback");
+ }
+
+ this.callback(callbackArg);
+ }
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker)
+ {
+ this.QueueUserWorkItem(worker, null);
+ }
+
+ public void QueueUserWorkItem(WaitCallback worker, object arg)
+ {
+ if(worker == null)
+ {
+ throw new ArgumentNullException("Invalid WaitCallback passed");
+ }
+
+ if(!this.closed)
+ {
+ lock(syncRoot)
+ {
+ if(!this.closed || !this.closing)
+ {
+ this.workQueue.Enqueue(new Future(worker, arg));
+
+ if(!this.running)
+ {
+ this.executionComplete.Reset();
+ this.running = true;
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+ }
+
+ public bool IsShutdown
+ {
+ get { return this.closed; }
+ }
+
+ public void Shutdown()
+ {
+ if(!this.closed)
+ {
+ syncRoot.WaitOne();
+
+ if(!this.closed)
+ {
+ this.closing = true;
+ this.workQueue.Clear();
+
+ if(this.running)
+ {
+ syncRoot.ReleaseMutex();
+ this.executionComplete.WaitOne();
+ syncRoot.WaitOne();
+ }
+
+ this.closed = true;
+ }
+
+ syncRoot.ReleaseMutex();
+ }
+ }
+
+ private void QueueProcessor(object unused)
+ {
+ Future theTask = null;
+
+ lock(syncRoot)
+ {
+ if(this.workQueue.Count == 0 || this.closing)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ return;
+ }
+
+ theTask = this.workQueue.Dequeue();
+ }
+
+ try
+ {
+ theTask.Run();
+ }
+ finally
+ {
+ if(this.closing)
+ {
+ this.running = false;
+ this.executionComplete.Set();
+ }
+ else
+ {
+ ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
+ }
+ }
+ }
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Threads/ThreadPoolExecutor.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1063838&r1=1063837&r2=1063838&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs Wed Jan 26 19:19:27 2011
@@ -45,6 +45,9 @@ namespace Apache.NMS.Stomp.Transport
private AsyncWriteTask asyncWriteTask;
private readonly Mutex monitor = new Mutex();
+
+ private static int id = 0;
+ private readonly int instanceId = 0;
private bool disposing = false;
private Timer connectionCheckTimer;
@@ -83,7 +86,8 @@ namespace Apache.NMS.Stomp.Transport
public InactivityMonitor(ITransport next)
: base(next)
{
- Tracer.Debug("Creating Inactivity Monitor");
+ this.instanceId = ++id;
+ Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
}
~InactivityMonitor()
@@ -98,8 +102,14 @@ namespace Apache.NMS.Stomp.Transport
// get rid of unmanaged stuff
}
- this.disposing = true;
- StopMonitorThreads();
+ lock(monitor)
+ {
+ this.localWireFormatInfo = null;
+ this.remoteWireFormatInfo = null;
+ this.disposing = true;
+ StopMonitorThreads();
+ }
+
base.Dispose(disposing);
}
@@ -123,19 +133,19 @@ namespace Apache.NMS.Stomp.Transport
{
if(this.inWrite.Value || this.failed.Value)
{
- Tracer.Debug("Inactivity Monitor is in write or already failed.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
return;
}
if(!commandSent.Value)
{
- //Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
- Tracer.Debug("Message sent since last write check. Resetting flag");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
}
commandSent.Value = false;
@@ -150,6 +160,7 @@ namespace Apache.NMS.Stomp.Transport
if(!AllowReadCheck(elapsed))
{
+ Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
return;
}
@@ -157,12 +168,13 @@ namespace Apache.NMS.Stomp.Transport
if(this.inRead.Value || this.failed.Value || this.asyncErrorTask == null)
{
+ Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
return;
}
if(!commandReceived.Value)
{
- Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
this.asyncErrorTask.IsPending = true;
this.asyncTasks.Wakeup();
}
@@ -215,9 +227,9 @@ namespace Apache.NMS.Stomp.Transport
{
if(Tracer.IsDebugEnabled)
{
- Tracer.Debug("InactivityMonitor: New Keep Alive Received at -> " +
- DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
- "." + DateTime.Now.Millisecond);
+ Tracer.DebugFormat("InactivityMonitor[{0}]: New Keep Alive Received at -> " +
+ DateTime.Now.ToLongTimeString().TrimEnd(" APM".ToCharArray()) +
+ "." + DateTime.Now.Millisecond, instanceId);
}
}
@@ -325,27 +337,29 @@ namespace Apache.NMS.Stomp.Transport
initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay;
- Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime );
- Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime );
- Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+ instanceId, readCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+ instanceId, initialDelayTime );
this.asyncTasks = new CompositeTaskRunner();
if(this.asyncErrorTask != null)
{
- Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Adding the Async Read Check Task to the Runner.", instanceId);
this.asyncTasks.AddTask(this.asyncErrorTask);
}
if(this.asyncWriteTask != null)
{
- Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
+ instanceId, writeCheckTime );
this.asyncTasks.AddTask(this.asyncWriteTask);
}
if(this.asyncErrorTask != null || this.asyncWriteTask != null)
{
- Tracer.Debug("Inactivity: Starting the Monitor Timer.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Starting the Monitor Timer.", instanceId);
monitorStarted.Value = true;
this.connectionCheckTimer = new Timer(
@@ -427,10 +441,13 @@ namespace Apache.NMS.Stomp.Transport
public bool Iterate()
{
+ Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
+ Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
+ parent.instanceId);
KeepAliveInfo info = new KeepAliveInfo();
this.parent.next.Oneway(info);
}
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs?rev=1063838&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs Wed Jan 26 19:19:27 2011
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.Stomp.Threads;
+using NUnit.Framework;
+
+namespace Apache.NMS.Stomp.Test
+{
+ [TestFixture]
+ public class ThreadPoolExecutorTest
+ {
+ private const int JOB_COUNT = 100;
+ private ManualResetEvent complete = new ManualResetEvent(false);
+ private bool waitingTaskCompleted = false;
+ private CountDownLatch doneLatch;
+ private int count = 0;
+
+ internal class DummyClass
+ {
+ private int data;
+
+ public DummyClass(int data)
+ {
+ this.data = data;
+ }
+
+ public int Data
+ {
+ get { return data; }
+ }
+ }
+
+ public ThreadPoolExecutorTest()
+ {
+ }
+
+ private void TaskThatSignalsWhenItsComplete(object unused)
+ {
+ waitingTaskCompleted = true;
+ complete.Set();
+ }
+
+ private void TaskThatCountsDown(object unused)
+ {
+ doneLatch.countDown();
+ }
+
+ private void TaskThatSleeps(object unused)
+ {
+ Thread.Sleep(5000);
+ }
+
+ private void TaskThatIncrementsCount(object unused)
+ {
+ count++;
+ }
+
+ private void TaskThatThrowsAnException(object unused)
+ {
+ throw new Exception("Throwing an Exception just because");
+ }
+
+ private void TaskThatValidatesTheArg(object arg)
+ {
+ DummyClass state = arg as DummyClass;
+ if(state != null && state.Data == 10 )
+ {
+ waitingTaskCompleted = true;
+ }
+ complete.Set();
+ }
+
+ [SetUp]
+ public void SetUp()
+ {
+ this.complete.Reset();
+ this.waitingTaskCompleted = false;
+ this.doneLatch = new CountDownLatch(JOB_COUNT);
+ this.count = 0;
+ }
+
+ [Test]
+ public void TestConstructor()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestSingleTaskExecuted()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatSignalsWhenItsComplete);
+
+ this.complete.WaitOne();
+ Assert.IsTrue(this.waitingTaskCompleted);
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestTaskParamIsPropagated()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatValidatesTheArg, new DummyClass(10));
+
+ this.complete.WaitOne();
+ Assert.IsTrue(this.waitingTaskCompleted);
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestAllTasksComplete()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatCountsDown);
+ }
+
+ Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestAllTasksCompleteAfterException()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatThrowsAnException);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatCountsDown);
+ }
+
+ Assert.IsTrue(this.doneLatch.await(TimeSpan.FromMilliseconds(30 * 1000)));
+
+ executor.Shutdown();
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ [Test]
+ public void TestThatShutdownPurgesTasks()
+ {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor();
+ Assert.IsNotNull(executor);
+ Assert.IsFalse(executor.IsShutdown);
+
+ executor.QueueUserWorkItem(TaskThatSleeps);
+
+ for(int i = 0; i < JOB_COUNT; ++i)
+ {
+ executor.QueueUserWorkItem(TaskThatIncrementsCount);
+ }
+
+ Thread.Sleep(100);
+
+ executor.Shutdown();
+ Assert.AreEqual(0, count);
+ Assert.IsTrue(executor.IsShutdown);
+ }
+
+ }
+}
+
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/Threads/ThreadPoolExecutorTest.cs
------------------------------------------------------------------------------
svn:eol-style = native