You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/12/22 20:20:15 UTC
svn commit: r1052037 - in
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp:
Threads/CompositeTaskRunner.cs Transport/InactivityMonitor.cs
Transport/TransportFilter.cs
Author: tabish
Date: Wed Dec 22 19:20:14 2010
New Revision: 1052037
URL: http://svn.apache.org/viewvc?rev=1052037&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-298
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/CompositeTaskRunner.cs Wed Dec 22 19:20:14 2010
@@ -33,12 +33,21 @@ namespace Apache.NMS.ActiveMQ.Threads
private bool terminated = false;
private bool pending = false;
private bool shutdown = false;
-
+
+ private string name = "CompositeTaskRunner";
+
public CompositeTaskRunner()
{
this.theThread = new Thread(Run) {IsBackground = true};
this.theThread.Start();
}
+
+ public CompositeTaskRunner(string name)
+ {
+ this.name = name;
+ this.theThread = new Thread(Run) {IsBackground = true};
+ this.theThread.Start();
+ }
public void AddTask(CompositeTask task)
{
@@ -60,7 +69,7 @@ namespace Apache.NMS.ActiveMQ.Threads
public void Shutdown(TimeSpan timeout)
{
- lock(mutex)
+ lock(mutex)
{
this.shutdown = true;
this.pending = true;
@@ -74,6 +83,8 @@ namespace Apache.NMS.ActiveMQ.Threads
Monitor.Wait(this.mutex, timeout);
}
}
+
+ Tracer.Debug(name + ": Task Runner Shut Down");
}
public void Shutdown()
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs Wed Dec 22 19:20:14 2010
@@ -49,6 +49,9 @@ namespace Apache.NMS.ActiveMQ.Transport
private DateTime lastReadCheckTime;
+ private static int id = 0;
+ private readonly int instanceId = 0;
+
private long readCheckTime;
public long ReadCheckTime
{
@@ -88,7 +91,8 @@ namespace Apache.NMS.ActiveMQ.Transport
public InactivityMonitor(ITransport next)
: base(next)
{
- Tracer.Debug("Creating Inactivity Monitor");
+ this.instanceId = ++id;
+ Tracer.Debug("Creating Inactivity Monitor: " + instanceId);
}
~InactivityMonitor()
@@ -103,9 +107,11 @@ namespace Apache.NMS.ActiveMQ.Transport
// get rid of unmanaged stuff
}
- StopMonitorThreads();
-
- base.Dispose(disposing);
+ lock(monitor)
+ {
+ StopMonitorThreads();
+ base.Dispose(disposing);
+ }
}
public void CheckConnection(object state)
@@ -126,19 +132,19 @@ namespace Apache.NMS.ActiveMQ.Transport
{
if(this.inWrite.Value || this.failed.Value)
{
- Tracer.Debug("Inactivity Monitor is in write or already failed.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: is in write or already failed.", instanceId);
return;
}
if(!commandSent.Value)
{
- Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No Message sent since last write check. Sending a KeepAliveInfo.", instanceId);
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
}
else
{
- Tracer.Debug("Message sent since last write check. Resetting flag");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Message sent since last write check. Resetting flag.", instanceId);
}
commandSent.Value = false;
@@ -153,7 +159,7 @@ namespace Apache.NMS.ActiveMQ.Transport
if(!AllowReadCheck(elapsed))
{
- Tracer.Debug("Inactivity Monitor: A read check is not currently allowed.");
+ Tracer.Debug("InactivityMonitor["+ instanceId +"]: A read check is not currently allowed.");
return;
}
@@ -161,13 +167,13 @@ namespace Apache.NMS.ActiveMQ.Transport
if(this.inRead.Value || this.failed.Value)
{
- Tracer.Debug("A receive is in progress or already failed.");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: A receive is in progress or already failed.", instanceId);
return;
}
if(!commandReceived.Value)
{
- Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+ Tracer.DebugFormat("InactivityMonitor[{0}]: No message received since last read check! Sending an InactivityException!", instanceId);
this.asyncErrorTask.IsPending = true;
this.asyncTasks.Wakeup();
}
@@ -287,6 +293,11 @@ namespace Apache.NMS.ActiveMQ.Transport
{
lock(monitor)
{
+ if(this.IsDisposed)
+ {
+ return;
+ }
+
if(monitorStarted.Value)
{
return;
@@ -311,10 +322,12 @@ namespace Apache.NMS.ActiveMQ.Transport
localWireFormatInfo.MaxInactivityDurationInitialDelay,
remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
- Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime );
- Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
+ instanceId, readCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Initial Delay time interval: {1}",
+ instanceId, initialDelayTime );
- this.asyncTasks = new CompositeTaskRunner();
+ this.asyncTasks = new CompositeTaskRunner("InactivityMonitor[" + instanceId + "].Runner");
this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
this.asyncWriteTask = new AsyncWriteTask(this);
@@ -328,7 +341,8 @@ namespace Apache.NMS.ActiveMQ.Transport
writeCheckTime = readCheckTime > 3 ? readCheckTime / 3 : readCheckTime;
- Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime );
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Write Check time interval: {1}",
+ instanceId, writeCheckTime );
this.connectionCheckTimer = new Timer(
new TimerCallback(CheckConnection),
@@ -351,14 +365,23 @@ namespace Apache.NMS.ActiveMQ.Transport
// Attempt to wait for the Timer to shutdown, but don't wait
// forever, if they don't shutdown after two seconds, just quit.
this.connectionCheckTimer.Dispose(shutdownEvent);
- shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false);
+ if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false))
+ {
+ Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown properly.", instanceId);
+ }
+
+ this.asyncTasks.RemoveTask(this.asyncWriteTask);
+ this.asyncTasks.RemoveTask(this.asyncErrorTask);
this.asyncTasks.Shutdown();
this.asyncTasks = null;
this.asyncWriteTask = null;
this.asyncErrorTask = null;
+ this.connectionCheckTimer = null;
}
}
+
+ Tracer.DebugFormat("InactivityMonitor[{0}]: Stopped Monitor Threads.", instanceId);
}
#region Async Tasks
@@ -412,12 +435,13 @@ namespace Apache.NMS.ActiveMQ.Transport
public bool Iterate()
{
- Tracer.Debug("AsyncWriteTask perparing for another Write Check");
+ Tracer.DebugFormat("InactivityMonitor[{0}] perparing for another Write Check", parent.instanceId);
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
- Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
+ Tracer.DebugFormat("InactivityMonitor[{0}] Write Check required sending KeepAlive.",
+ parent.instanceId);
KeepAliveInfo info = new KeepAliveInfo();
info.ResponseRequired = this.parent.keepAliveResponseRequired.Value;
this.parent.Oneway(info);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1052037&r1=1052036&r2=1052037&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Wed Dec 22 19:20:14 2010
@@ -151,6 +151,8 @@ namespace Apache.NMS.ActiveMQ.Transport
{
if(disposing)
{
+ Tracer.Debug("TransportFilter disposing of next Transport: " +
+ this.next.GetType().Name);
this.next.Dispose();
}
disposed = true;
@@ -190,6 +192,7 @@ namespace Apache.NMS.ActiveMQ.Transport
public virtual void Stop()
{
+ this.next.Stop();
}
public Object Narrow(Type type)