You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/12 17:23:13 UTC
svn commit: r517271 -
/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Author: chirino
Date: Mon Mar 12 09:23:11 2007
New Revision: 517271
URL: http://svn.apache.org/viewvc?view=rev&rev=517271
Log:
Set the eol-style
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (contents, props changed)
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=517271&r1=517270&r2=517271
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Mon Mar 12 09:23:11 2007
@@ -32,8 +32,8 @@
public class StompWireFormat : IWireFormat
{
private Encoding encoding = new UTF8Encoding();
- private ITransport transport;
- private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+ private ITransport transport;
+ private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
public StompWireFormat()
{
@@ -91,41 +91,41 @@
response.CorrelationId = command.CommandId;
SendCommand(response);
}
- Console.WriteLine("#### Ignored command: " + o.GetType());
+ Console.WriteLine("#### Ignored command: " + o.GetType());
Console.Out.Flush();
}
else
{
- Console.WriteLine("#### Ignored command: " + o.GetType());
+ Console.WriteLine("#### Ignored command: " + o.GetType());
Console.Out.Flush();
}
}
- internal String ReadLine(BinaryReader dis)
- {
- MemoryStream ms = new MemoryStream();
- while (true)
- {
- int nextChar = dis.Read();
- if (nextChar < 0)
- {
- throw new IOException("Peer closed the stream.");
- }
- if( nextChar == 10 )
- {
- break;
- }
- ms.WriteByte((byte)nextChar);
- }
- byte[] data = ms.ToArray();
+ internal String ReadLine(BinaryReader dis)
+ {
+ MemoryStream ms = new MemoryStream();
+ while (true)
+ {
+ int nextChar = dis.Read();
+ if (nextChar < 0)
+ {
+ throw new IOException("Peer closed the stream.");
+ }
+ if( nextChar == 10 )
+ {
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+ byte[] data = ms.ToArray();
return encoding.GetString(data, 0, data.Length);
}
public Object Unmarshal(BinaryReader dis)
- {
+ {
string command;
- do {
+ do {
command = ReadLine(dis);
}
while (command == "");
@@ -133,7 +133,7 @@
Console.WriteLine("<<<< command: " + command);
IDictionary headers = new Hashtable();
- string line;
+ string line;
while ((line = ReadLine(dis)) != "")
{
int idx = line.IndexOf(':');
@@ -158,18 +158,18 @@
content = dis.ReadBytes(size);
}
else
- {
+ {
MemoryStream ms = new MemoryStream();
int nextChar;
while((nextChar = dis.Read()) != 0)
{
- if( nextChar < 0 )
- {
- // EOF ??
- break;
+ if( nextChar < 0 )
+ {
+ // EOF ??
+ break;
}
ms.WriteByte((byte)nextChar);
- }
+ }
content = ms.ToArray();
}
Object answer = CreateCommand(command, headers, content);
@@ -188,14 +188,14 @@
Response answer = new Response();
answer.CorrelationId = Int32.Parse(text);
return answer;
- } else if( command == "CONNECTED") {
- text = RemoveHeader(headers, "response-id");
- if (text != null)
- {
- Response answer = new Response();
- answer.CorrelationId = Int32.Parse(text);
- return answer;
- }
+ } else if( command == "CONNECTED") {
+ text = RemoveHeader(headers, "response-id");
+ if (text != null)
+ {
+ Response answer = new Response();
+ answer.CorrelationId = Int32.Parse(text);
+ return answer;
+ }
}
}
else if (command == "ERROR")
@@ -297,7 +297,7 @@
ss.WriteHeader("passcode", command.Password);
if (command.ResponseRequired)
- {
+ {
ss.WriteHeader("request-id", command.CommandId);
}
@@ -314,24 +314,24 @@
{
ss.WriteCommand(command, "SUBSCRIBE");
ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
- ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
- ss.WriteHeader("selector", command.Selector);
- if ( command.NoLocal )
+ ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+ ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+ ss.WriteHeader("selector", command.Selector);
+ if ( command.NoLocal )
ss.WriteHeader("no-local", command.NoLocal);
ss.WriteHeader("ack", "client");
// ActiveMQ extensions to STOMP
- ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+ ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
if ( command.Exclusive )
ss.WriteHeader("activemq.exclusive", command.Exclusive);
ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
- ss.WriteHeader("activemq.priority ", command.Priority);
+ ss.WriteHeader("activemq.priority ", command.Priority);
if ( command.Retroactive )
- ss.WriteHeader("activemq.retroactive", command.Retroactive);
-
+ ss.WriteHeader("activemq.retroactive", command.Retroactive);
+
consumers[command.ConsumerId] = command.ConsumerId;
ss.Flush();
}
@@ -344,33 +344,33 @@
ConsumerId consumerId = id as ConsumerId;
ss.WriteCommand(command, "UNSUBSCRIBE");
ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
- ss.Flush();
- consumers.Remove(consumerId);
- }
- // When a session is removed, it needs to remove it's consumers too.
- if (id is SessionId)
- {
-
- // Find all the consumer that were part of the session.
- SessionId sessionId = (SessionId) id;
- ArrayList matches = new ArrayList();
- foreach (DictionaryEntry entry in consumers)
- {
- ConsumerId t = (ConsumerId) entry.Key;
- if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
- {
- matches.Add(t);
- }
- }
-
- // Un-subscribe them.
- foreach (ConsumerId consumerId in matches)
- {
- ss.WriteCommand(command, "UNSUBSCRIBE");
- ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
- ss.Flush();
- consumers.Remove(consumerId);
- }
+ ss.Flush();
+ consumers.Remove(consumerId);
+ }
+ // When a session is removed, it needs to remove it's consumers too.
+ if (id is SessionId)
+ {
+
+ // Find all the consumer that were part of the session.
+ SessionId sessionId = (SessionId) id;
+ ArrayList matches = new ArrayList();
+ foreach (DictionaryEntry entry in consumers)
+ {
+ ConsumerId t = (ConsumerId) entry.Key;
+ if( sessionId.ConnectionId==t.ConnectionId && sessionId.Value==t.SessionId )
+ {
+ matches.Add(t);
+ }
+ }
+
+ // Un-subscribe them.
+ foreach (ConsumerId consumerId in matches)
+ {
+ ss.WriteCommand(command, "UNSUBSCRIBE");
+ ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+ ss.Flush();
+ consumers.Remove(consumerId);
+ }
}
}
@@ -406,16 +406,16 @@
protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
{
ss.WriteCommand(command, "SEND");
- ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- if (command.ReplyTo != null)
- ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
- if (command.CorrelationId != null )
- ss.WriteHeader("correlation-id", command.CorrelationId);
- if (command.Expiration != 0)
- ss.WriteHeader("expires", command.Expiration);
+ ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+ if (command.ReplyTo != null)
+ ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+ if (command.CorrelationId != null )
+ ss.WriteHeader("correlation-id", command.CorrelationId);
+ if (command.Expiration != 0)
+ ss.WriteHeader("expires", command.Expiration);
if (command.Priority != 4)
- ss.WriteHeader("priority", command.Priority);
- if (command.Type != null)
+ ss.WriteHeader("priority", command.Priority);
+ if (command.Type != null)
ss.WriteHeader("type", command.Type);
if (command.TransactionId!=null)
ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
@@ -448,9 +448,9 @@
{
ss.WriteCommand(command, "ACK");
- // TODO handle bulk ACKs?
+ // TODO handle bulk ACKs?
ss.WriteHeader("message-id", StompHelper.ToStomp(command.FirstMessageId));
- if( command.TransactionId!=null )
+ if( command.TransactionId!=null )
ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
ss.Flush();
Propchange: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
------------------------------------------------------------------------------
svn:eol-style = native