You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/11/08 17:42:30 UTC

svn commit: r1032632 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: Protocol/StompFrame.cs Protocol/StompWireFormat.cs Transport/InactivityMonitor.cs

Author: tabish
Date: Mon Nov  8 16:42:30 2010
New Revision: 1032632

URL: http://svn.apache.org/viewvc?rev=1032632&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-284

Improved Inactivity Monitor code to work with Apollo, and StompWireFormat and StompFrame logging for easier debug.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs Mon Nov  8 16:42:30 2010
@@ -31,6 +31,8 @@ namespace Apache.NMS.Stomp.Protocol
         public const String SEPARATOR = ":";
         /// Used to mark the End of the Frame.
         public const byte FRAME_TERMINUS = (byte) 0;
+        /// Used to denote a Special KeepAlive command that consists of a single newline.
+        public const String KEEPALIVE = "KEEPALIVE";
         
         private string command;
         private IDictionary properties = new Hashtable();
@@ -113,8 +115,33 @@ namespace Apache.NMS.Stomp.Protocol
             this.properties.Clear();
         }
 
+        public override string ToString()
+        {
+            StringBuilder builder = new StringBuilder();
+
+            builder.Append(GetType().Name + "[ ");
+            builder.Append("Command=" + Command);
+            builder.Append(", Properties={");
+            foreach(string key in this.properties.Keys)
+            {
+                builder.Append(" " + key + "=" + this.properties[key]);
+            }
+
+            builder.Append("}, ");
+            builder.Append("Content=" + this.content ?? this.content.ToString());
+            builder.Append("]");
+
+            return builder.ToString();
+        }
+
         public void ToStream(BinaryWriter dataOut)
         {
+            if(this.Command == KEEPALIVE)
+            {
+                dataOut.Write(NEWLINE);
+                return;
+            }
+
             StringBuilder builder = new StringBuilder();
             
             builder.Append(this.Command);
@@ -142,8 +169,12 @@ namespace Apache.NMS.Stomp.Protocol
         public void FromStream(BinaryReader dataIn)
         {
             this.ReadCommandHeader(dataIn);
-            this.ReadHeaders(dataIn);
-            this.ReadContent(dataIn);
+
+            if(this.command != KEEPALIVE)
+            {
+                this.ReadHeaders(dataIn);
+                this.ReadContent(dataIn);
+            }
         }
 
         private void ReadCommandHeader(BinaryReader dataIn)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Mon Nov  8 16:42:30 2010
@@ -125,7 +125,12 @@ namespace Apache.NMS.Stomp.Protocol
         protected virtual Object CreateCommand(StompFrame frame)
         {
             string command = frame.Command;
-            
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Received " + frame.ToString());
+            }
+
             if(command == "RECEIPT")
             {
                 string text = frame.RemoveProperty("receipt-id");
@@ -137,15 +142,12 @@ namespace Apache.NMS.Stomp.Protocol
                         text = text.Substring("ignore:".Length);
                     }
 
-                    Tracer.Debug("StompWireFormat - Received RESPONSE command: CorrelationId = " + text);
-                    
                     answer.CorrelationId = Int32.Parse(text);
                     return answer;
                 }
             }
             else if(command == "CONNECTED")
             {
-                Tracer.Debug("StompWireFormat - Received CONNECTED command");
                 return ReadConnected(frame);
             }
             else if(command == "ERROR")
@@ -154,7 +156,6 @@ namespace Apache.NMS.Stomp.Protocol
                 
                 if(text != null && text.StartsWith("ignore:"))
                 {
-                    Tracer.Debug("StompWireFormat - Received ERROR Response command: correlationId = " + text);
                     Response answer = new Response();
                     answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
                     return answer;
@@ -170,18 +171,15 @@ namespace Apache.NMS.Stomp.Protocol
                     BrokerError error = new BrokerError();
                     error.Message = frame.RemoveProperty("message");
                     answer.Exception = error;
-                    Tracer.Debug("StompWireFormat - Received ERROR command: " + error.Message);                    
                     return answer;
                 }
             }
             else if(command == "KEEPALIVE")
             {
-                Tracer.Debug("StompWireFormat - Received KEEPALIVE command");
                 return new KeepAliveInfo();
             }
             else if(command == "MESSAGE")
             {
-                Tracer.Debug("StompWireFormat - Received MESSAGE command");
                 return ReadMessage(frame);
             }
             
@@ -192,12 +190,6 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual Command ReadConnected(StompFrame frame)
         {
-            Tracer.Debug("CONNECTED command: " + frame.Command + " headers: ");
-            foreach(string key in frame.Properties.Keys)
-            {
-                Tracer.DebugFormat("   property[{0}] = {1}", key, frame.Properties[key]);
-            }
-
             string responseId = frame.RemoveProperty("response-id");
 
             this.remoteWireFormatInfo = new WireFormatInfo();
@@ -415,6 +407,11 @@ namespace Apache.NMS.Stomp.Protocol
                 frame.SetProperty(key, map[key]);
             }
 
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
             frame.ToStream(dataOut);
         }
 
@@ -428,13 +425,16 @@ namespace Apache.NMS.Stomp.Protocol
 
             frame.SetProperty("message-id", command.LastMessageId.ToString());
 
-            Tracer.Debug("ACK - Outbound MessageId = " + frame.GetProperty("message-id"));
-            
             if(command.TransactionId != null)
             {
                 frame.SetProperty("transaction", command.TransactionId.ToString());
             }
 
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
             frame.ToStream(dataOut);
         }
         
@@ -456,14 +456,26 @@ namespace Apache.NMS.Stomp.Protocol
                 frame.SetProperty("heart-beat", command.WriteCheckInterval + "," + command.ReadCheckInterval);
             }
 
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
             frame.ToStream(dataOut);
         }
 
         protected virtual void WriteShutdownInfo(ShutdownInfo command, BinaryWriter dataOut)
         {
             System.Diagnostics.Debug.Assert(!command.ResponseRequired);
-            
-            new StompFrame("DISCONNECT").ToStream(dataOut);
+
+            StompFrame frame = new StompFrame("DISCONNECT");
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
+            frame.ToStream(dataOut);
         }
 
         protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut)
@@ -481,8 +493,6 @@ namespace Apache.NMS.Stomp.Protocol
             frame.SetProperty("selector", command.Selector);
             frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode));
 
-            Tracer.Debug("SUBSCRIBE : Outbound AckMode = " + frame.GetProperty("ack"));
-            
             if(command.NoLocal)
             {
                 frame.SetProperty("no-local", command.NoLocal.ToString());
@@ -523,12 +533,24 @@ namespace Apache.NMS.Stomp.Protocol
                 frame.SetProperty("activemq.retroactive", command.Retroactive);
             }
 
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
             frame.ToStream(dataOut);
         }
 
         protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
         {
-            dataOut.Write((byte) '\n' );
+            StompFrame frame = new StompFrame(StompFrame.KEEPALIVE);
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
+            frame.ToStream(dataOut);
         }
 
         protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
@@ -544,6 +566,12 @@ namespace Apache.NMS.Stomp.Protocol
                     frame.SetProperty("receipt", command.CommandId);
                 }                
                 frame.SetProperty("id", consumerId.ToString() );
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+                }
+
                 frame.ToStream(dataOut);
             }
         }
@@ -574,6 +602,12 @@ namespace Apache.NMS.Stomp.Protocol
             }
             
             frame.SetProperty("transaction", command.TransactionId.ToString());
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("StompWireFormat - Writing " + frame.ToString());
+            }
+
             frame.ToStream(dataOut);
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1032632&r1=1032631&r2=1032632&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs Mon Nov  8 16:42:30 2010
@@ -49,21 +49,21 @@ namespace Apache.NMS.Stomp.Transport
 
         private DateTime lastReadCheckTime;
 
-        private long readCheckTime;
+        private long readCheckTime = 30000;
         public long ReadCheckTime
         {
             get { return this.readCheckTime; }
             set { this.readCheckTime = value; }
         }
 
-        private long writeCheckTime;
+        private long writeCheckTime = 10000;
         public long WriteCheckTime
         {
             get { return this.writeCheckTime; }
             set { this.writeCheckTime = value; }
         }
 
-        private long initialDelayTime;
+        private long initialDelayTime = 0;
         public long InitialDelayTime
         {
             get { return this.initialDelayTime; }
@@ -103,6 +103,8 @@ namespace Apache.NMS.Stomp.Transport
         
         public void CheckConnection(object state)
         {
+            Tracer.DebugFormat("Timer Elapsed at {0}", DateTime.Now.ToLocalTime());
+
             // First see if we have written or can write.
             WriteCheck();
             
@@ -123,16 +125,16 @@ namespace Apache.NMS.Stomp.Transport
                 return;
             }
 
-            if(!commandSent.Value)
-            {
+//            if(!commandSent.Value)
+//            {
                 Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
                 this.asyncWriteTask.IsPending = true;
                 this.asyncTasks.Wakeup();
-            }
-            else
-            {
-                Tracer.Debug("Message sent since last write check. Resetting flag");
-            }
+//            }
+//            else
+//            {
+//                Tracer.Debug("Message sent since last write check. Resetting flag");
+//            }
 
             commandSent.Value = false;
         }
@@ -178,7 +180,7 @@ namespace Apache.NMS.Stomp.Transport
         /// <returns></returns>
         public bool AllowReadCheck(TimeSpan elapsed)
         {
-            return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
+            return (elapsed.TotalMilliseconds > (readCheckTime + readCheckTime * 0.90) );
         }
         #endregion
 
@@ -316,16 +318,19 @@ namespace Apache.NMS.Stomp.Transport
 
                 if(this.asyncErrorTask != null)
                 {
+                    Tracer.Debug("Inactivity: Adding the Async Read Check Task to the Runner.");
                     this.asyncTasks.AddTask(this.asyncErrorTask);
                 }
 
                 if(this.asyncWriteTask != null)
                 {
+                    Tracer.Debug("Inactivity: Adding the Async Write Check Task to the Runner.");
                     this.asyncTasks.AddTask(this.asyncWriteTask);
                 }
 
                 if(this.asyncErrorTask != null || this.asyncWriteTask != null)
                 {
+                    Tracer.Debug("Inactivity: Starting the Monitor Timer.");
                     monitorStarted.Value = true;
 
                     this.connectionCheckTimer = new Timer(
@@ -410,12 +415,10 @@ namespace Apache.NMS.Stomp.Transport
 
             public bool Iterate()
             {
-                Tracer.Debug("AsyncWriteTask perparing for another Write Check");
                 if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
                 {
                     try
                     {
-                        Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
                         KeepAliveInfo info = new KeepAliveInfo();
                         this.parent.Oneway(info);
                     }