You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/25 21:03:29 UTC
svn commit: r902963 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp:
./ Threads/
Author: tabish
Date: Mon Jan 25 20:03:28 2010
New Revision: 902963
URL: http://svn.apache.org/viewvc?rev=902963&view=rev
Log:
Fix for: https://issues.apache.org/activemq/browse/AMQNET-218
Adds disposeStopTimeout and closeStopTimeout to Session, after the timeout expires if not set to infinite the dispatching thread in the Executors task runner will be aborted.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/SessionExecutor.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/CompositeTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/DedicatedTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/PooledTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/TaskRunner.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs Mon Jan 25 20:03:28 2010
@@ -72,7 +72,7 @@
this.info = info;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
}
-
+
~MessageConsumer()
{
Dispose(false);
@@ -502,6 +502,11 @@
}
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+
+ // If aborted we stop the abort here and let normal processing resume.
+ // This allows the session to shutdown normally and ack all messages
+ // that have outstanding acks in this consumer.
+ Thread.ResetAbort();
}
}
else
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs Mon Jan 25 20:03:28 2010
@@ -54,6 +54,8 @@
private bool disposed = false;
private bool closed = false;
private bool closing = false;
+ private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+ private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(-1);
private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
private AcknowledgementMode acknowledgementMode;
@@ -203,6 +205,18 @@
get { return Interlocked.Increment(ref this.nextDeliveryId); }
}
+ public long DisposeStopTimeout
+ {
+ get { return (long) this.disposeStopTimeout.TotalMilliseconds; }
+ set { this.disposeStopTimeout = TimeSpan.FromMilliseconds(value); }
+ }
+
+ public long CloseStopTimeout
+ {
+ get { return (long) this.closeStopTimeout.TotalMilliseconds; }
+ set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); }
+ }
+
#endregion
#region ISession Members
@@ -227,6 +241,9 @@
try
{
+ // Force a Stop when we are Disposing vs a Normal Close.
+ this.executor.Stop(this.disposeStopTimeout);
+
Close();
}
catch
@@ -284,7 +301,7 @@
this.closing = true;
// Stop all message deliveries from this Session
- Stop();
+ this.executor.Stop(this.closeStopTimeout);
lock(consumers.SyncRoot)
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/SessionExecutor.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/SessionExecutor.cs Mon Jan 25 20:03:28 2010
@@ -109,6 +109,21 @@
}
}
+ public void Stop(TimeSpan timeout)
+ {
+ if(messageQueue.Running)
+ {
+ messageQueue.Stop();
+ TaskRunner taskRunner = this.taskRunner;
+
+ if(taskRunner != null)
+ {
+ this.taskRunner = null;
+ taskRunner.ShutdownWithAbort(timeout);
+ }
+ }
+ }
+
public void Close()
{
this.messageQueue.Close();
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/CompositeTaskRunner.cs Mon Jan 25 20:03:28 2010
@@ -83,6 +83,29 @@
this.Shutdown(TimeSpan.FromMilliseconds(-1));
}
+ public void ShutdownWithAbort(TimeSpan timeout)
+ {
+ lock(mutex)
+ {
+ this.shutdown = true;
+ this.pending = true;
+
+ Monitor.PulseAll(this.mutex);
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if(Thread.CurrentThread != this.theThread && !this.terminated)
+ {
+ Monitor.Wait(this.mutex, timeout);
+
+ if(!this.terminated)
+ {
+ theThread.Abort();
+ }
+ }
+ }
+ }
+
public void Wakeup()
{
lock(mutex)
@@ -134,6 +157,7 @@
}
catch
{
+ Thread.ResetAbort();
}
finally
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/DedicatedTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/DedicatedTaskRunner.cs Mon Jan 25 20:03:28 2010
@@ -49,16 +49,16 @@
public void Shutdown(TimeSpan timeout)
{
- lock(mutex)
+ lock(mutex)
{
this.shutdown = true;
this.pending = true;
-
+
Monitor.PulseAll(this.mutex);
// Wait till the thread stops ( no need to wait if shutdown
// is called from thread that is shutting down)
- if(Thread.CurrentThread != this.theThread && !this.terminated)
+ if(Thread.CurrentThread != this.theThread && !this.terminated)
{
Monitor.Wait(this.mutex, timeout);
}
@@ -70,6 +70,29 @@
this.Shutdown(TimeSpan.FromMilliseconds(-1));
}
+ public void ShutdownWithAbort(TimeSpan timeout)
+ {
+ lock(mutex)
+ {
+ this.shutdown = true;
+ this.pending = true;
+
+ Monitor.PulseAll(this.mutex);
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if(Thread.CurrentThread != this.theThread && !this.terminated)
+ {
+ Monitor.Wait(this.mutex, timeout);
+
+ if(!this.terminated)
+ {
+ theThread.Abort();
+ }
+ }
+ }
+ }
+
public void Wakeup()
{
lock(mutex)
@@ -97,7 +120,6 @@
if(this.shutdown)
{
-
return;
}
}
@@ -122,6 +144,8 @@
}
catch
{
+ // Prevent the ThreadAbortedException for propogating.
+ Thread.ResetAbort();
}
finally
{
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/PooledTaskRunner.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/PooledTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/PooledTaskRunner.cs Mon Jan 25 20:03:28 2010
@@ -110,6 +110,27 @@
}
}
+ public void ShutdownWithAbort(TimeSpan timeout)
+ {
+ lock(runable)
+ {
+ _shutdown = true;
+
+ if(runningThread != System.Threading.Thread.CurrentThread)
+ {
+ if(iterating)
+ {
+ System.Threading.Thread.Sleep(timeout);
+ }
+
+ if(iterating)
+ {
+ runningThread.Abort();
+ }
+ }
+ }
+ }
+
public void Shutdown()
{
Shutdown(new TimeSpan(Timeout.Infinite));
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/TaskRunner.cs?rev=902963&r1=902962&r2=902963&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/TaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Threads/TaskRunner.cs Mon Jan 25 20:03:28 2010
@@ -24,8 +24,40 @@
/// </summary>
public interface TaskRunner
{
+ /// <summary>
+ /// Wakeup the TaskRunner and have it check for any pending work that
+ /// needs to be completed. If none is found it will go back to sleep
+ /// until another Wakeup call is made.
+ /// </summary>
void Wakeup();
+
+ /// <summary>
+ /// Attempt to Shutdown the TaskRunner, this method will wait indefinitely
+ /// for the TaskRunner to quite if the task runner is in a call to its Task's
+ /// run method and that never returns.
+ /// </summary>
void Shutdown();
+
+ /// <summary>
+ /// Performs a timed wait for the TaskRunner to shutdown. If the TaskRunner
+ /// is in a call to its Task's run method and that does not return before the
+ /// timeout expires this method returns and the TaskRunner may remain in the
+ /// running state.
+ /// </summary>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
void Shutdown(TimeSpan timeout);
+
+ /// <summary>
+ /// Performs a timed wait for the TaskRunner to shutdown. If the TaskRunner
+ /// is in a call to its Task's run method and that does not return before the
+ /// timeout expires this method sends an Abort to the Task thread and return.
+ /// </summary>
+ /// <param name="timeout">
+ /// A <see cref="TimeSpan"/>
+ /// </param>
+ void ShutdownWithAbort(TimeSpan timeout);
+
}
}