You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/06 21:17:54 UTC
svn commit: r515281 - in
/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ:
Commands/ActiveMQTextMessage.cs Transport/Stomp/StompWireFormat.cs
Author: chirino
Date: Tue Mar 6 12:17:53 2007
New Revision: 515281
URL: http://svn.apache.org/viewvc?view=rev&rev=515281
Log:
- Have the ToString of the TextMessage also dump out it's text
- Improved the Stomp wireformat so that it sets the right headers on the Ack
- We now remove all the consumer assoicated with a session when a session is closed.
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs Tue Mar 6 12:17:53 2007
@@ -40,9 +40,13 @@
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
-
-
- public override byte GetDataStructureType()
+
+ public override string ToString()
+ {
+ return base.ToString() + " Text="+Text;
+ }
+
+ public override byte GetDataStructureType()
{
return ID_ActiveMQTextMessage;
}
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=515281&r1=515280&r2=515281
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Tue Mar 6 12:17:53 2007
@@ -32,8 +32,9 @@
public class StompWireFormat : IWireFormat
{
private Encoding encoding = new UTF8Encoding();
- private ITransport transport;
-
+ private ITransport transport;
+ private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+
public StompWireFormat()
{
}
@@ -312,20 +313,25 @@
{
ss.WriteCommand(command, "SUBSCRIBE");
ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- ss.WriteHeader("selector", command.Selector);
- ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
- ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
- ss.WriteHeader("no-local", command.NoLocal);
+ ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+ ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+ ss.WriteHeader("selector", command.Selector);
+ if ( command.NoLocal )
+ ss.WriteHeader("no-local", command.NoLocal);
ss.WriteHeader("ack", "client");
// ActiveMQ extensions to STOMP
- ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
- ss.WriteHeader("activemq.exclusive", command.Exclusive);
+ ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+ if ( command.Exclusive )
+ ss.WriteHeader("activemq.exclusive", command.Exclusive);
+
ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
- ss.WriteHeader("activemq.priority ", command.Priority);
- ss.WriteHeader("activemq.retroactive", command.Retroactive);
-
+ ss.WriteHeader("activemq.priority ", command.Priority);
+ if ( command.Retroactive )
+ ss.WriteHeader("activemq.retroactive", command.Retroactive);
+
+ consumers[command.ConsumerId] = command.ConsumerId;
ss.Flush();
}
@@ -336,10 +342,35 @@
{
ConsumerId consumerId = id as ConsumerId;
ss.WriteCommand(command, "UNSUBSCRIBE");
- ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
-
- ss.Flush();
- }
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.Flush();
+ consumers.Remove(consumerId);
+ }
+ // When a session is removed, it needs to remove it's consumers too.
+ if (id is SessionId)
+ {
+
+ // Find all the consumer that were part of the session.
+ SessionId sessionId = (SessionId) id;
+ ArrayList matches = new ArrayList();
+ foreach (DictionaryEntry entry in consumers)
+ {
+ ConsumerId t = (ConsumerId) entry.Key;
+ if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
+ {
+ matches.Add(t);
+ }
+ }
+
+ // Un-subscribe them.
+ foreach (ConsumerId consumerId in matches)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE");
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.Flush();
+ consumers.Remove(consumerId);
+ }
+ }
}
@@ -374,13 +405,20 @@
protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
{
ss.WriteCommand(command, "SEND");
- ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
- ss.WriteHeader("correlation-id", command.CorrelationId);
- ss.WriteHeader("expires", command.Expiration);
- ss.WriteHeader("priority", command.Priority);
- ss.WriteHeader("type", command.Type);
- ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+ ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+ if (command.ReplyTo != null)
+ ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+ if (command.CorrelationId != null )
+ ss.WriteHeader("correlation-id", command.CorrelationId);
+ if (command.Expiration != 0)
+ ss.WriteHeader("expires", command.Expiration);
+ if (command.Priority != 4)
+ ss.WriteHeader("priority", command.Priority);
+ if (command.Type != null)
+ ss.WriteHeader("type", command.Type);
+ if (command.TransactionId!=null)
+ ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+
ss.WriteHeader("persistent", command.Persistent);
// lets force the content to be marshalled
@@ -409,9 +447,10 @@
{
ss.WriteCommand(command, "ACK");
- // TODO handle bulk ACKs?
- ss.WriteHeader("message-id", command.FirstMessageId);
- ss.WriteHeader("transaction", command.TransactionId);
+ // TODO handle bulk ACKs?
+ ss.WriteHeader("message-id", StompHelper.ToStomp(command.FirstMessageId));
+ if( command.TransactionId!=null )
+ ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
ss.Flush();
}