You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/22 20:20:15 UTC

svn commit: r1052037 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Threads/CompositeTaskRunner.cs Transport/InactivityMonitor.cs Transport/TransportFilter.cs

Author: tabish
Date: Wed Dec 22 19:20:14 2010
New Revision: 1052037

URL: http://svn.apache.org/viewvc?rev=1052037&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-298

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Wed Dec 22 19:20:14 2010
@@ -33,12 +33,21 @@ namespace Apache.NMS.ActiveMQ.Threads
         private bool terminated = false;
         private bool pending = false;
         private bool shutdown = false;
-        
+
+        private string name = "CompositeTaskRunner";
+
         public CompositeTaskRunner()
         {
             this.theThread = new Thread(Run) {IsBackground = true};
             this.theThread.Start();
         }
+
+        public CompositeTaskRunner(string name)
+        {
+            this.name = name;
+            this.theThread = new Thread(Run) {IsBackground = true};
+            this.theThread.Start();
+        }
 		
 		public void AddTask(CompositeTask task)
 		{
@@ -60,7 +69,7 @@ namespace Apache.NMS.ActiveMQ.Threads
 
         public void Shutdown(TimeSpan timeout)
         {
-            lock(mutex) 
+            lock(mutex)
             {
                 this.shutdown = true;
                 this.pending = true;
@@ -74,6 +83,8 @@ namespace Apache.NMS.ActiveMQ.Threads
                     Monitor.Wait(this.mutex, timeout);
                 }
             }
+
+            Tracer.Debug(name + ": Task Runner Shut Down");
         }
 
         public void Shutdown()

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs Wed Dec 22 19:20:14 2010
@@ -49,6 +49,9 @@ namespace Apache.NMS.ActiveMQ.Transport
 
         private DateTime lastReadCheckTime;
 
+        private static int id = 0;
+        private readonly int instanceId = 0;
+
         private long readCheckTime;
         public long ReadCheckTime
         {
@@ -88,7 +91,8 @@ namespace Apache.NMS.ActiveMQ.Transport
         public InactivityMonitor(ITransport next)
             : base(next)
         {
-            Tracer.Debug("Creating Inactivity Monitor");
+            this.instanceId = ++id;
+            Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
         }
 
         ~InactivityMonitor()
@@ -103,9 +107,11 @@ namespace Apache.NMS.ActiveMQ.Transport
                 // get rid of unmanaged stuff
             }
 
-            StopMonitorThreads();
-
-            base.Dispose(disposing);
+            lock(monitor)
+            {
+                StopMonitorThreads();
+                base.Dispose(disposing);
+            }
         }
 		
 		public void CheckConnection(object state)
@@ -126,19 +132,19 @@ namespace Apache.NMS.ActiveMQ.Transport
         {
             if(this.inWrite.Value || this.failed.Value)
             {
-                Tracer.Debug("Inactivity Monitor is in write or already failed.");				
+                Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
                 return;
             }
 
             if(!commandSent.Value)
             {
-                Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
                 this.asyncWriteTask.IsPending = true;
                 this.asyncTasks.Wakeup();
             }
             else
             {
-                Tracer.Debug("Message sent since last write check. Resetting flag");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
             }
 
             commandSent.Value = false;
@@ -153,7 +159,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 
             if(!AllowReadCheck(elapsed))
             {
-                Tracer.Debug("Inactivity Monitor: A read check is not currently allowed.");				
+                Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
                 return;
             }
 
@@ -161,13 +167,13 @@ namespace Apache.NMS.ActiveMQ.Transport
 
             if(this.inRead.Value || this.failed.Value)
             {
-                Tracer.Debug("A receive is in progress or already failed.");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
                 return;
             }
 
             if(!commandReceived.Value)
             {
-                Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+                Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
                 this.asyncErrorTask.IsPending = true;
                 this.asyncTasks.Wakeup();
             }
@@ -287,6 +293,11 @@ namespace Apache.NMS.ActiveMQ.Transport
         {
             lock(monitor)
             {
+                if(this.IsDisposed)
+                {
+                    return;
+                }
+
                 if(monitorStarted.Value)
                 {
                     return;
@@ -311,10 +322,12 @@ namespace Apache.NMS.ActiveMQ.Transport
                         localWireFormatInfo.MaxInactivityDurationInitialDelay,
                         remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
 				
-				Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime );
-				Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime );
+				Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+                                   instanceId, readCheckTime );
+				Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+                                   instanceId, initialDelayTime );
 
-                this.asyncTasks = new CompositeTaskRunner();
+                this.asyncTasks = new CompositeTaskRunner("InactivityMonitor[" + instanceId + "].Runner");
 
                 this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
                 this.asyncWriteTask = new AsyncWriteTask(this);
@@ -328,7 +341,8 @@ namespace Apache.NMS.ActiveMQ.Transport
 
                     writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
 
-					Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime );
+					Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
+                                       instanceId, writeCheckTime );
 									
                     this.connectionCheckTimer = new Timer(
                         new TimerCallback(CheckConnection),
@@ -351,14 +365,23 @@ namespace Apache.NMS.ActiveMQ.Transport
                     // Attempt to wait for the Timer to shutdown, but don't wait
                     // forever, if they don't shutdown after two seconds, just quit.
                     this.connectionCheckTimer.Dispose(shutdownEvent);
-                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false);
+                    if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false))
+                    {
+                        Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown properly.", instanceId);
+                    }
+
+                    this.asyncTasks.RemoveTask(this.asyncWriteTask);
+                    this.asyncTasks.RemoveTask(this.asyncErrorTask);
 
                     this.asyncTasks.Shutdown();
                     this.asyncTasks = null;
                     this.asyncWriteTask = null;
                     this.asyncErrorTask = null;
+                    this.connectionCheckTimer = null;
                 }
             }
+
+            Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor Threads.", instanceId);
         }
 
         #region Async Tasks
@@ -412,12 +435,13 @@ namespace Apache.NMS.ActiveMQ.Transport
 
             public bool Iterate()
             {
-				Tracer.Debug("AsyncWriteTask perparing for another Write Check");
+				Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
                 if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
                 {
                     try
                     {
-						Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
+						Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
+                                           parent.instanceId);
                         KeepAliveInfo info = new KeepAliveInfo();
                         info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
                         this.parent.Oneway(info);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Wed Dec 22 19:20:14 2010
@@ -151,6 +151,8 @@ namespace Apache.NMS.ActiveMQ.Transport
 		{
 			if(disposing)
 			{
+                Tracer.Debug("TransportFilter disposing of next Transport: " +
+                             this.next.GetType().Name);
 				this.next.Dispose();
 			}
 			disposed = true;
@@ -190,6 +192,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 		
 		public virtual void Stop()
 		{
+            this.next.Stop();
 		}
 
         public Object Narrow(Type type)