You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/06 21:17:54 UTC

svn commit: r515281 - in /activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ: Commands/ActiveMQTextMessage.cs Transport/Stomp/StompWireFormat.cs

Author: chirino
Date: Tue Mar  6 12:17:53 2007
New Revision: 515281

URL: http://svn.apache.org/viewvc?view=rev&rev=515281
Log:
- Have the ToString of the TextMessage also dump out it's text
- Improved the Stomp wireformat so that it sets the right headers on the Ack
   - We now remove all the consumer assoicated with a session when a session is closed.

Modified:
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs Tue Mar  6 12:17:53 2007
@@ -40,9 +40,13 @@
         // TODO generate Equals method
         // TODO generate GetHashCode method
         // TODO generate ToString method
-        
-        
-        public override byte GetDataStructureType()
+
+	    public override string ToString()
+	    {
+	        return base.ToString() + " Text="+Text;
+	    }
+
+	    public override byte GetDataStructureType()
         {
             return ID_ActiveMQTextMessage;
         }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Tue Mar  6 12:17:53 2007
@@ -32,8 +32,9 @@
     public class StompWireFormat : IWireFormat
     {
 		private Encoding encoding = new UTF8Encoding();
-		private ITransport transport;
-		
+		private ITransport transport;
+        private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+
 		public StompWireFormat()
 		{
 		}
@@ -312,20 +313,25 @@
 		{
 			ss.WriteCommand(command, "SUBSCRIBE");
 			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
-			ss.WriteHeader("selector", command.Selector);
-			ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
-			ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
-			ss.WriteHeader("no-local", command.NoLocal);
+			ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+		    ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+            ss.WriteHeader("selector", command.Selector);
+            if ( command.NoLocal )
+                ss.WriteHeader("no-local", command.NoLocal);
 			ss.WriteHeader("ack", "client");
 
 			// ActiveMQ extensions to STOMP
-			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
-			ss.WriteHeader("activemq.exclusive", command.Exclusive);
+			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+            if ( command.Exclusive )
+			    ss.WriteHeader("activemq.exclusive", command.Exclusive);
+		    
 			ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
 			ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
-			ss.WriteHeader("activemq.priority ", command.Priority);
-			ss.WriteHeader("activemq.retroactive", command.Retroactive);
-			
+			ss.WriteHeader("activemq.priority ", command.Priority);
+            if ( command.Retroactive )
+			    ss.WriteHeader("activemq.retroactive", command.Retroactive);
+
+            consumers[command.ConsumerId] = command.ConsumerId;
 			ss.Flush();
 		}
 
@@ -336,10 +342,35 @@
 			{
 				ConsumerId consumerId = id as ConsumerId;
 				ss.WriteCommand(command, "UNSUBSCRIBE");
-				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
-				
-				ss.Flush();
-			}
+				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));				
+				ss.Flush();
+                consumers.Remove(consumerId);
+            }
+		    // When a session is removed, it needs to remove it's consumers too.
+            if (id is SessionId)
+            {
+                
+                // Find all the consumer that were part of the session.
+                SessionId sessionId = (SessionId) id;
+                ArrayList matches = new ArrayList();
+                foreach (DictionaryEntry entry in consumers)
+                {
+                    ConsumerId t = (ConsumerId) entry.Key;
+                    if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
+                    {
+                        matches.Add(t);
+                    }
+                }
+
+                // Un-subscribe them.
+                foreach (ConsumerId consumerId in matches)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE");
+                    ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+                    ss.Flush();
+                    consumers.Remove(consumerId);
+                }
+            }
 		}
 		
 		
@@ -374,13 +405,20 @@
 		protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
 		{
 			ss.WriteCommand(command, "SEND");
-			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
-			ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
-			ss.WriteHeader("correlation-id", command.CorrelationId);
-			ss.WriteHeader("expires", command.Expiration);
-			ss.WriteHeader("priority", command.Priority);
-			ss.WriteHeader("type", command.Type);
-			ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+            if (command.ReplyTo != null)
+			    ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+            if (command.CorrelationId != null )
+                ss.WriteHeader("correlation-id", command.CorrelationId);
+            if (command.Expiration != 0)
+                ss.WriteHeader("expires", command.Expiration);
+            if (command.Priority != 4)
+			    ss.WriteHeader("priority", command.Priority);
+            if (command.Type != null)
+                ss.WriteHeader("type", command.Type);            
+		    if (command.TransactionId!=null)
+			    ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+		    
 			ss.WriteHeader("persistent", command.Persistent);
 			
 			// lets force the content to be marshalled
@@ -409,9 +447,10 @@
 		{
 			ss.WriteCommand(command, "ACK");
 			
-			// TODO handle bulk ACKs?
-			ss.WriteHeader("message-id", command.FirstMessageId);
-			ss.WriteHeader("transaction", command.TransactionId);
+			// TODO handle bulk ACKs?
+            ss.WriteHeader("message-id", StompHelper.ToStomp(command.FirstMessageId));
+			if( command.TransactionId!=null )
+                ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
 
 			ss.Flush();
 		}