You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/01/25 20:42:01 UTC

svn commit: r902956 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: MessageConsumer.cs Session.cs SessionExecutor.cs Threads/CompositeTaskRunner.cs Threads/DedicatedTaskRunner.cs Threads/PooledTaskRunner.cs Threads/TaskRunner.cs

Author: tabish
Date: Mon Jan 25 19:42:00 2010
New Revision: 902956

URL: http://svn.apache.org/viewvc?rev=902956&view=rev
Log:
Fix for: https://issues.apache.org/activemq/browse/AMQNET-218

Adds disposeStopTimeout and closeStopTimeout to Session, after the timeout expires if not set to infinite the dispatching thread in the Executors task runner will be aborted.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Mon Jan 25 19:42:00 2010
@@ -72,7 +72,7 @@
 			this.info = info;
 			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
 		}
-
+        
 		~MessageConsumer()
 		{
 			Dispose(false);
@@ -502,6 +502,11 @@
 								}
 
 								Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+
+                                // If aborted we stop the abort here and let normal processing resume.
+                                // This allows the session to shutdown normally and ack all messages
+                                // that have outstanding acks in this consumer.
+                                Thread.ResetAbort();
 							}
 						}
 						else

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Mon Jan 25 19:42:00 2010
@@ -54,6 +54,8 @@
         private bool disposed = false;
         private bool closed = false;
         private bool closing = false;
+        private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+        private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(-1);
         private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
         private AcknowledgementMode acknowledgementMode;
 
@@ -203,6 +205,18 @@
             get { return Interlocked.Increment(ref this.nextDeliveryId); }
         }
 
+        public long DisposeStopTimeout
+        {
+            get { return (long) this.disposeStopTimeout.TotalMilliseconds; }
+            set { this.disposeStopTimeout = TimeSpan.FromMilliseconds(value); }
+        }
+
+        public long CloseStopTimeout
+        {
+            get { return (long) this.closeStopTimeout.TotalMilliseconds; }
+            set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); }
+        }
+
         #endregion
 
         #region ISession Members
@@ -227,6 +241,9 @@
 
             try
             {
+                // Force a Stop when we are Disposing vs a Normal Close.
+                this.executor.Stop(this.disposeStopTimeout);
+
                 Close();
             }
             catch
@@ -284,7 +301,7 @@
                     this.closing = true;
 
                     // Stop all message deliveries from this Session
-                    Stop();
+                    this.executor.Stop(this.closeStopTimeout);
 
                     lock(consumers.SyncRoot)
                     {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs Mon Jan 25 19:42:00 2010
@@ -109,6 +109,21 @@
             }
         }
 
+        public void Stop(TimeSpan timeout)
+        {
+            if(messageQueue.Running)
+            {
+                messageQueue.Stop();
+                TaskRunner taskRunner = this.taskRunner;
+
+                if(taskRunner != null)
+                {
+                    this.taskRunner = null;
+                    taskRunner.ShutdownWithAbort(timeout);
+                }
+            }
+        }
+
         public void Close()
         {
             this.messageQueue.Close();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Mon Jan 25 19:42:00 2010
@@ -83,6 +83,29 @@
             this.Shutdown(TimeSpan.FromMilliseconds(-1));
         }
 
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(mutex)
+            {
+                this.shutdown = true;
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
+                {
+                    Monitor.Wait(this.mutex, timeout);
+
+                    if(!this.terminated)
+                    {
+                        theThread.Abort();
+                    }
+                }
+            }
+        }
+
         public void Wakeup()
         {
             lock(mutex)
@@ -134,6 +157,7 @@
             }
             catch
             {
+                Thread.ResetAbort();
             }
             finally
             {        

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs Mon Jan 25 19:42:00 2010
@@ -49,16 +49,16 @@
 
         public void Shutdown(TimeSpan timeout)
         {
-            lock(mutex) 
+            lock(mutex)
             {
                 this.shutdown = true;
                 this.pending = true;
-                
+
                 Monitor.PulseAll(this.mutex);
 
                 // Wait till the thread stops ( no need to wait if shutdown
                 // is called from thread that is shutting down)
-                if(Thread.CurrentThread != this.theThread && !this.terminated) 
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
                 {
                     Monitor.Wait(this.mutex, timeout);
                 }
@@ -70,6 +70,29 @@
             this.Shutdown(TimeSpan.FromMilliseconds(-1));
         }
 
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(mutex)
+            {
+                this.shutdown = true;
+                this.pending = true;
+
+                Monitor.PulseAll(this.mutex);
+
+                // Wait till the thread stops ( no need to wait if shutdown
+                // is called from thread that is shutting down)
+                if(Thread.CurrentThread != this.theThread && !this.terminated)
+                {
+                    Monitor.Wait(this.mutex, timeout);
+
+                    if(!this.terminated)
+                    {
+                        theThread.Abort();
+                    }
+                }
+            }
+        }
+
         public void Wakeup()
         {
             lock(mutex)
@@ -97,7 +120,6 @@
                         
                         if(this.shutdown)
                         {
-                            
                             return;
                         }
                     }
@@ -122,6 +144,8 @@
             }
             catch
             {
+                // Prevent the ThreadAbortedException for propogating.
+                Thread.ResetAbort();
             }
             finally
             {        

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Mon Jan 25 19:42:00 2010
@@ -110,6 +110,27 @@
 			}
 		}
 
+        public void ShutdownWithAbort(TimeSpan timeout)
+        {
+            lock(runable)
+            {
+                _shutdown = true;
+
+                if(runningThread != System.Threading.Thread.CurrentThread)
+                {
+                    if(iterating)
+                    {
+                        System.Threading.Thread.Sleep(timeout);
+                    }
+
+                    if(iterating)
+                    {
+                        runningThread.Abort();
+                    }
+                }
+            }
+        }
+
 		public void Shutdown()
 		{
 			Shutdown(new TimeSpan(Timeout.Infinite));

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=902956&r1=902955&r2=902956&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs Mon Jan 25 19:42:00 2010
@@ -24,8 +24,40 @@
 	/// </summary>
 	public interface TaskRunner
 	{
+        /// <summary>
+        /// Wakeup the TaskRunner and have it check for any pending work that
+        /// needs to be completed.  If none is found it will go back to sleep
+        /// until another Wakeup call is made.
+        /// </summary>
 		void Wakeup();
+
+        /// <summary>
+        /// Attempt to Shutdown the TaskRunner, this method will wait indefinitely
+        /// for the TaskRunner to quite if the task runner is in a call to its Task's
+        /// run method and that never returns.
+        /// </summary>
 		void Shutdown();
+
+        /// <summary>
+        /// Performs a timed wait for the TaskRunner to shutdown.  If the TaskRunner
+        /// is in a call to its Task's run method and that does not return before the
+        /// timeout expires this method returns and the TaskRunner may remain in the
+        /// running state.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="TimeSpan"/>
+        /// </param>
 		void Shutdown(TimeSpan timeout);
+
+        /// <summary>
+        /// Performs a timed wait for the TaskRunner to shutdown.  If the TaskRunner
+        /// is in a call to its Task's run method and that does not return before the
+        /// timeout expires this method sends an Abort to the Task thread and return.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="TimeSpan"/>
+        /// </param>
+        void ShutdownWithAbort(TimeSpan timeout);
+
 	}
 }