You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/08 17:42:30 UTC
svn commit: r1032632 - in
/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp:
Protocol/StompFrame.cs Protocol/StompWireFormat.cs
Transport/InactivityMonitor.cs
Author: tabish
Date: Mon Nov 8 16:42:30 2010
New Revision: 1032632
URL: http://svn.apache.org/viewvc?rev=1032632&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-284
Improved Inactivity Monitor code to work with Apollo, and StompWireFormat and StompFrame logging for easier debug.
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs Mon Nov 8 16:42:30 2010
@@ -31,6 +31,8 @@ namespace Apache.NMS.Stomp.Protocol
public const String SEPARATOR = ":";
/// Used to mark the End of the Frame.
public const byte FRAME_TERMINUS = (byte) 0;
+ /// Used to denote a Special KeepAlive command that consists of a single newline.
+ public const String KEEPALIVE = "KEEPALIVE";
private string command;
private IDictionary properties = new Hashtable();
@@ -113,8 +115,33 @@ namespace Apache.NMS.Stomp.Protocol
this.properties.Clear();
}
+ public override string ToString()
+ {
+ StringBuilder builder = new StringBuilder();
+
+ builder.Append(GetType().Name + "[ ");
+ builder.Append("Command=" + Command);
+ builder.Append(", Properties={");
+ foreach(string key in this.properties.Keys)
+ {
+ builder.Append(" " + key + "=" + this.properties[key]);
+ }
+
+ builder.Append("}, ");
+ builder.Append("Content=" + this.content ?? this.content.ToString());
+ builder.Append("]");
+
+ return builder.ToString();
+ }
+
public void ToStream(BinaryWriter dataOut)
{
+ if(this.Command == KEEPALIVE)
+ {
+ dataOut.Write(NEWLINE);
+ return;
+ }
+
StringBuilder builder = new StringBuilder();
builder.Append(this.Command);
@@ -142,8 +169,12 @@ namespace Apache.NMS.Stomp.Protocol
public void FromStream(BinaryReader dataIn)
{
this.ReadCommandHeader(dataIn);
- this.ReadHeaders(dataIn);
- this.ReadContent(dataIn);
+
+ if(this.command != KEEPALIVE)
+ {
+ this.ReadHeaders(dataIn);
+ this.ReadContent(dataIn);
+ }
}
private void ReadCommandHeader(BinaryReader dataIn)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Mon Nov 8 16:42:30 2010
@@ -125,7 +125,12 @@ namespace Apache.NMS.Stomp.Protocol
protected virtual Object CreateCommand(StompFrame frame)
{
string command = frame.Command;
-
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Received " + frame.ToString());
+ }
+
if(command == "RECEIPT")
{
string text = frame.RemoveProperty("receipt-id");
@@ -137,15 +142,12 @@ namespace Apache.NMS.Stomp.Protocol
text = text.Substring("ignore:".Length);
}
- Tracer.Debug("StompWireFormat - Received RESPONSE command: CorrelationId = " + text);
-
answer.CorrelationId = Int32.Parse(text);
return answer;
}
}
else if(command == "CONNECTED")
{
- Tracer.Debug("StompWireFormat - Received CONNECTED command");
return ReadConnected(frame);
}
else if(command == "ERROR")
@@ -154,7 +156,6 @@ namespace Apache.NMS.Stomp.Protocol
if(text != null && text.StartsWith("ignore:"))
{
- Tracer.Debug("StompWireFormat - Received ERROR Response command: correlationId = " + text);
Response answer = new Response();
answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
return answer;
@@ -170,18 +171,15 @@ namespace Apache.NMS.Stomp.Protocol
BrokerError error = new BrokerError();
error.Message = frame.RemoveProperty("message");
answer.Exception = error;
- Tracer.Debug("StompWireFormat - Received ERROR command: " + error.Message);
return answer;
}
}
else if(command == "KEEPALIVE")
{
- Tracer.Debug("StompWireFormat - Received KEEPALIVE command");
return new KeepAliveInfo();
}
else if(command == "MESSAGE")
{
- Tracer.Debug("StompWireFormat - Received MESSAGE command");
return ReadMessage(frame);
}
@@ -192,12 +190,6 @@ namespace Apache.NMS.Stomp.Protocol
protected virtual Command ReadConnected(StompFrame frame)
{
- Tracer.Debug("CONNECTED command: " + frame.Command + " headers: ");
- foreach(string key in frame.Properties.Keys)
- {
- Tracer.DebugFormat(" property[{0}] = {1}", key, frame.Properties[key]);
- }
-
string responseId = frame.RemoveProperty("response-id");
this.remoteWireFormatInfo = new WireFormatInfo();
@@ -415,6 +407,11 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty(key, map[key]);
}
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
@@ -428,13 +425,16 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty("message-id", command.LastMessageId.ToString());
- Tracer.Debug("ACK - Outbound MessageId = " + frame.GetProperty("message-id"));
-
if(command.TransactionId != null)
{
frame.SetProperty("transaction", command.TransactionId.ToString());
}
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
@@ -456,14 +456,26 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty("heart-beat", command.WriteCheckInterval + "," + command.ReadCheckInterval);
}
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
protected virtual void WriteShutdownInfo(ShutdownInfo command, BinaryWriter dataOut)
{
System.Diagnostics.Debug.Assert(!command.ResponseRequired);
-
- new StompFrame("DISCONNECT").ToStream(dataOut);
+
+ StompFrame frame = new StompFrame("DISCONNECT");
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
+ frame.ToStream(dataOut);
}
protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut)
@@ -481,8 +493,6 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty("selector", command.Selector);
frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode));
- Tracer.Debug("SUBSCRIBE : Outbound AckMode = " + frame.GetProperty("ack"));
-
if(command.NoLocal)
{
frame.SetProperty("no-local", command.NoLocal.ToString());
@@ -523,12 +533,24 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty("activemq.retroactive", command.Retroactive);
}
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
{
- dataOut.Write((byte) '\n' );
+ StompFrame frame = new StompFrame(StompFrame.KEEPALIVE);
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
+ frame.ToStream(dataOut);
}
protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
@@ -544,6 +566,12 @@ namespace Apache.NMS.Stomp.Protocol
frame.SetProperty("receipt", command.CommandId);
}
frame.SetProperty("id", consumerId.ToString() );
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
}
@@ -574,6 +602,12 @@ namespace Apache.NMS.Stomp.Protocol
}
frame.SetProperty("transaction", command.TransactionId.ToString());
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+ }
+
frame.ToStream(dataOut);
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs Mon Nov 8 16:42:30 2010
@@ -49,21 +49,21 @@ namespace Apache.NMS.Stomp.Transport
private DateTime lastReadCheckTime;
- private long readCheckTime;
+ private long readCheckTime = 30000;
public long ReadCheckTime
{
get { return this.readCheckTime; }
set { this.readCheckTime = value; }
}
- private long writeCheckTime;
+ private long writeCheckTime = 10000;
public long WriteCheckTime
{
get { return this.writeCheckTime; }
set { this.writeCheckTime = value; }
}
- private long initialDelayTime;
+ private long initialDelayTime = 0;
public long InitialDelayTime
{
get { return this.initialDelayTime; }
@@ -103,6 +103,8 @@ namespace Apache.NMS.Stomp.Transport
public void CheckConnection(object state)
{
+ Tracer.DebugFormat("Timer Elapsed at {0}", DateTime.Now.ToLocalTime());
+
// First see if we have written or can write.
WriteCheck();
@@ -123,16 +125,16 @@ namespace Apache.NMS.Stomp.Transport
return;
}
- if(!commandSent.Value)
- {
+// if(!commandSent.Value)
+// {
Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
this.asyncWriteTask.IsPending = true;
this.asyncTasks.Wakeup();
- }
- else
- {
- Tracer.Debug("Message sent since last write check. Resetting flag");
- }
+// }
+// else
+// {
+// Tracer.Debug("Message sent since last write check. Resetting flag");
+// }
commandSent.Value = false;
}
@@ -178,7 +180,7 @@ namespace Apache.NMS.Stomp.Transport
/// <returns></returns>
public bool AllowReadCheck(TimeSpan elapsed)
{
- return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
+ return (elapsed.TotalMilliseconds > (readCheckTime + readCheckTime * 0.90) );
}
#endregion
@@ -316,16 +318,19 @@ namespace Apache.NMS.Stomp.Transport
if(this.asyncErrorTask != null)
{
+ Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
this.asyncTasks.AddTask(this.asyncErrorTask);
}
if(this.asyncWriteTask != null)
{
+ Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
this.asyncTasks.AddTask(this.asyncWriteTask);
}
if(this.asyncErrorTask != null || this.asyncWriteTask != null)
{
+ Tracer.Debug("Inactivity: Starting the Monitor Timer.");
monitorStarted.Value = true;
this.connectionCheckTimer = new Timer(
@@ -410,12 +415,10 @@ namespace Apache.NMS.Stomp.Transport
public bool Iterate()
{
- Tracer.Debug("AsyncWriteTask perparing for another Write Check");
if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
{
try
{
- Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
KeepAliveInfo info = new KeepAliveInfo();
this.parent.Oneway(info);
}