You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/12/09 16:55:14 UTC
svn commit: r888848 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk:
./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Protocol/
Author: tabish
Date: Wed Dec 9 15:55:13 2009
New Revision: 888848
URL: http://svn.apache.org/viewvc?rev=888848&view=rev
Log:
Work on simplifying the Command Marshaling phase and scale down the MessageProducer implementation to the basic needs of a Stomp client.
Added:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TextMessage.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ProducerInfo.cs Wed Dec 9 15:55:13 2009
@@ -25,7 +25,6 @@
ProducerId producerId;
Destination destination;
bool dispatchAsync;
- int windowSize;
///
/// <summery>
@@ -50,7 +49,6 @@
"ProducerId=" + ProducerId +
"Destination=" + Destination +
"DispatchAsync=" + DispatchAsync +
- "WindowSize=" + WindowSize +
"]";
}
@@ -72,12 +70,6 @@
set { this.dispatchAsync = value; }
}
- public int WindowSize
- {
- get { return windowSize; }
- set { this.windowSize = value; }
- }
-
///
/// <summery>
/// Return an answer of true to the isProducerInfo() query.
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/RemoveInfo.cs Wed Dec 9 15:55:13 2009
@@ -23,7 +23,6 @@
public class RemoveInfo : BaseCommand
{
DataStructure objectId;
- long lastDeliveredSequenceId;
///
/// <summery>
@@ -35,7 +34,6 @@
{
return GetType().Name + "[" +
"ObjectId=" + ObjectId +
- "LastDeliveredSequenceId=" + LastDeliveredSequenceId +
"]";
}
@@ -50,12 +48,6 @@
set { this.objectId = value; }
}
- public long LastDeliveredSequenceId
- {
- get { return lastDeliveredSequenceId; }
- set { this.lastDeliveredSequenceId = value; }
- }
-
///
/// <summery>
/// Return an answer of true to the isRemoveInfo() query.
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TextMessage.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TextMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/TextMessage.cs Wed Dec 9 15:55:13 2009
@@ -67,13 +67,6 @@
{
try
{
- if(this.text == null && this.Content != null)
- {
- Stream stream = new MemoryStream(this.Content);
- EndianBinaryReader reader = new EndianBinaryReader(stream);
- this.text = reader.ReadString32();
- this.Content = null;
- }
return this.text;
}
catch(IOException ex)
@@ -96,17 +89,7 @@
if(this.Content == null && text != null)
{
- byte[] data = null;
-
- // Set initial size to the size of the string the UTF-8 encode could
- // result in more if there are chars that encode to multibye values.
- MemoryStream buffer = new MemoryStream(text.Length);
- EndianBinaryWriter writer = new EndianBinaryWriter(buffer);
- writer.WriteString32(text);
- buffer.Close();
- data = buffer.ToArray();
-
- this.Content = data;
+ this.Content = wireFormat.Encoder.GetBytes(this.text);
this.text = null;
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Wed Dec 9 15:55:13 2009
@@ -36,14 +36,12 @@
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
private readonly object myLock = new object();
private bool asyncSend = false;
private bool alwaysSyncSend = false;
private bool asyncClose = true;
private bool copyMessageOnSend = true;
- private int producerWindowSize = 0;
private bool connected = false;
private bool closed = false;
private bool closing = false;
@@ -124,19 +122,6 @@
}
/// <summary>
- /// This property is the maximum number of bytes in memory that a producer will transmit
- /// to a broker before waiting for acknowledgement messages from the broker that it has
- /// accepted the previously sent messages. In other words, this how you configure the
- /// producer flow control window that is used for async sends where the client is responsible
- /// for managing memory usage. The default value of 0 means no flow control at the client
- /// </summary>
- public int ProducerWindowSize
- {
- get { return producerWindowSize; }
- set { producerWindowSize = value; }
- }
-
- /// <summary>
/// This property forces all messages that are sent to be sent synchronously overriding
/// any usage of the AsyncSend flag. This can reduce performance in some cases since the
/// only messages we normally send synchronously are Persistent messages not sent in a
@@ -320,16 +305,6 @@
this.dispatchers.Remove( id );
}
- internal void addProducer( ProducerId id, MessageProducer producer )
- {
- this.producers.Add( id, producer );
- }
-
- internal void removeProducer( ProducerId id )
- {
- this.producers.Remove( id );
- }
-
public void Close()
{
lock(myLock)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Wed Dec 9 15:55:13 2009
@@ -52,7 +52,6 @@
private int maximumRedeliveryCount = 10;
private int redeliveryTimeout = 500;
protected bool disposed = false;
- private long lastDeliveredSequenceId = 0;
private int deliveredCounter = 0;
private int additionalWindowSize = 0;
private long redeliveryDelay = 0;
@@ -82,11 +81,6 @@
#region Property Accessors
- public long LastDeliveredSequenceId
- {
- get { return this.lastDeliveredSequenceId; }
- }
-
public ConsumerId ConsumerId
{
get { return info.ConsumerId; }
@@ -273,11 +267,10 @@
}
this.unconsumedMessages.Close();
- this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+ this.session.DisposeOf(this.info.ConsumerId);
RemoveInfo removeCommand = new RemoveInfo();
removeCommand.ObjectId = this.info.ConsumerId;
- removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
this.session.Connection.Oneway(removeCommand);
this.session = null;
@@ -603,8 +596,6 @@
public void BeforeMessageIsConsumed(MessageDispatch dispatch)
{
- this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
-
if(!IsAutoAcknowledgeBatch)
{
lock(this.dispatchedMessages)
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs Wed Dec 9 15:55:13 2009
@@ -94,9 +94,6 @@
}
DoClose();
- RemoveInfo removeInfo = new RemoveInfo();
- removeInfo.ObjectId = this.info.ProducerId;
- this.session.Connection.Oneway(removeInfo);
this.session = null;
}
}
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs?rev=888848&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs Wed Dec 9 15:55:13 2009
@@ -0,0 +1,232 @@
+// /*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+using Apache.NMS;
+
+namespace Apache.NMS.Stomp.Protocol
+{
+ public class StompFrame
+ {
+ /// Used to terminate a header line or end of a headers section of the Frame.
+ public const String NEWLINE = "\n";
+ /// Used to seperate the Key / Value pairing in Frame Headers
+ public const String SEPARATOR = ":";
+ /// Used to mark the End of the Frame.
+ public const byte FRAME_TERMINUS = (byte) 0;
+
+ private string commoand;
+ private IDictionary properties = new Hashtable();
+ private byte[] content;
+
+ private Encoding encoding = new UTF8Encoding();
+
+ public StompFrame()
+ {
+ }
+
+ public StompFrame(string command)
+ {
+ this.commoand = command;
+ }
+
+ public byte[] Content
+ {
+ get { return this.content; }
+ set { this.content = value; }
+ }
+
+ public string Command
+ {
+ get { return this.commoand; }
+ set { this.commoand = value; }
+ }
+
+ public IDictionary Properties
+ {
+ get { return this.properties; }
+ set { this.properties = value; }
+ }
+
+ public bool HasProperty(string name)
+ {
+ return this.properties.Contains(name);
+ }
+
+ public void SetProperty(string name, Object value)
+ {
+ this.Properties[name] = value.ToString();
+ }
+
+ public string GetProperty(string name)
+ {
+ return GetProperty(name, null);
+ }
+
+ public string GetProperty(string name, string fallback)
+ {
+ if(this.properties.Contains(name))
+ {
+ return this.properties[name] as string;
+ }
+
+ return fallback;
+ }
+
+ public string RemoveProperty(string name)
+ {
+ string result = null;
+
+ if(this.properties.Contains(name))
+ {
+ result = this.properties[name] as string;
+ this.properties.Remove(name);
+ }
+
+ return result;
+ }
+
+ public void ClearProperties()
+ {
+ this.properties.Clear();
+ }
+
+ public void ToStream(BinaryWriter dataOut)
+ {
+ StringBuilder builder = new StringBuilder();
+
+ builder.Append(this.Command);
+ builder.Append(NEWLINE);
+
+ foreach(String key in this.Properties.Keys)
+ {
+ builder.Append(key);
+ builder.Append(SEPARATOR);
+ builder.Append(this.Properties[key] as string);
+ builder.Append(NEWLINE);
+ }
+
+ dataOut.Write(this.encoding.GetBytes(builder.ToString()));
+
+ if(this.Content != null)
+ {
+ dataOut.Write(this.Content);
+ }
+
+ dataOut.Write(FRAME_TERMINUS);
+ }
+
+ public void FromStream(BinaryReader dataIn)
+ {
+ this.ReadCommandHeader(dataIn);
+ this.ReadHeaders(dataIn);
+ this.ReadContent(dataIn);
+ }
+
+ private void ReadCommandHeader(BinaryReader dataIn)
+ {
+ string command;
+ do
+ {
+ command = ReadLine(dataIn);
+ }
+ while(command == "");
+ }
+
+ private void ReadHeaders(BinaryReader dataIn)
+ {
+ string line;
+ while((line = ReadLine(dataIn)) != "")
+ {
+ int idx = line.IndexOf(':');
+
+ if(idx > 0)
+ {
+ string key = line.Substring(0, idx);
+ string value = line.Substring(idx + 1);
+
+ this.properties[key] = value;
+
+ Tracer.Debug("StompFrame - Read Header: " + key + " = " + value);
+ }
+ else
+ {
+ Tracer.Debug("StompFrame - Read Malformed Header: " + line);
+ }
+ }
+ }
+
+ private void ReadContent(BinaryReader dataIn)
+ {
+ if(this.properties.Contains("content-length"))
+ {
+ int size = Int32.Parse(this.properties["content-length"] as string);
+ this.content = dataIn.ReadBytes(size);
+
+ // Read the terminating NULL byte for this frame.
+ if(dataIn.Read() != 0)
+ {
+ Tracer.Debug("StompFrame - Error Invalid Frame, no trailing Null.");
+ }
+ }
+ else
+ {
+ MemoryStream ms = new MemoryStream();
+ int nextChar;
+ while((nextChar = dataIn.Read()) != 0)
+ {
+ // The first Null in this case marks the end of data.
+ if(nextChar < 0)
+ {
+ break;
+ }
+
+ ms.WriteByte((byte)nextChar);
+ }
+
+ this.content = ms.ToArray();
+ }
+ }
+
+ private String ReadLine(BinaryReader dataIn)
+ {
+ MemoryStream ms = new MemoryStream();
+
+ while(true)
+ {
+ int nextChar = dataIn.Read();
+ if(nextChar < 0)
+ {
+ throw new IOException("Peer closed the stream.");
+ }
+ if(nextChar == 10)
+ {
+ break;
+ }
+ ms.WriteByte((byte)nextChar);
+ }
+
+ byte[] data = ms.ToArray();
+ return encoding.GetString(data, 0, data.Length);
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompHelper.cs Wed Dec 9 15:55:13 2009
@@ -123,6 +123,22 @@
}
}
+ public static string ToStomp(AcknowledgementMode ackMode)
+ {
+ if(ackMode == AcknowledgementMode.ClientAcknowledge)
+ {
+ return "client";
+ }
+ else if(ackMode == AcknowledgementMode.IndividualAcknowledge)
+ {
+ return "client-individual";
+ }
+ else
+ {
+ return "auto";
+ }
+ }
+
public static string ToStomp(ConsumerId id)
{
return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs Wed Dec 9 15:55:13 2009
@@ -29,7 +29,7 @@
/// </summary>
public class StompWireFormat : IWireFormat
{
- private Encoding encoding = new UTF8Encoding();
+ private Encoding encoder = new UTF8Encoding();
private ITransport transport;
private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
@@ -48,143 +48,78 @@
get { return 1; }
}
- public void Marshal(Object o, BinaryWriter binaryWriter)
+ public Encoding Encoder
{
- Tracer.Debug(">>>> " + o);
- StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
+ get { return this.encoder; }
+ set { this.encoder = value; }
+ }
+
+ public void Marshal(Object o, BinaryWriter dataOut)
+ {
+ Tracer.Debug("StompWireFormat - Marshaling: " + o);
- if (o is ConnectionInfo)
+ if(o is ConnectionInfo)
{
- WriteConnectionInfo((ConnectionInfo) o, ds);
+ WriteConnectionInfo((ConnectionInfo) o, dataOut);
}
- else if (o is Message)
+ else if(o is Message)
{
- WriteMessage((Message) o, ds);
+ WriteMessage((Message) o, dataOut);
}
- else if (o is ConsumerInfo)
+ else if(o is ConsumerInfo)
{
- WriteConsumerInfo((ConsumerInfo) o, ds);
+ WriteConsumerInfo((ConsumerInfo) o, dataOut);
}
- else if (o is MessageAck)
+ else if(o is MessageAck)
{
- WriteMessageAck((MessageAck) o, ds);
+ WriteMessageAck((MessageAck) o, dataOut);
}
- else if (o is TransactionInfo)
+ else if(o is TransactionInfo)
{
- WriteTransactionInfo((TransactionInfo) o, ds);
+ WriteTransactionInfo((TransactionInfo) o, dataOut);
}
- else if (o is ShutdownInfo)
+ else if(o is ShutdownInfo)
{
- WriteShutdownInfo((ShutdownInfo) o, ds);
+ WriteShutdownInfo((ShutdownInfo) o, dataOut);
}
- else if (o is RemoveInfo)
+ else if(o is RemoveInfo)
{
- WriteRemoveInfo((RemoveInfo) o, ds);
+ WriteRemoveInfo((RemoveInfo) o, dataOut);
}
- else if (o is Command)
+ else if(o is Command)
{
Command command = o as Command;
- if (command.ResponseRequired)
+ if(command.ResponseRequired)
{
Response response = new Response();
response.CorrelationId = command.CommandId;
SendCommand(response);
- Tracer.Debug("#### Autorespond to command: " + o.GetType());
+ Tracer.Debug("StompWireFormat - Autorespond to command: " + o.GetType());
}
}
else
{
- Tracer.Debug("#### Ignored command: " + o.GetType());
- }
- }
-
-
- internal String ReadLine(BinaryReader dis)
- {
- MemoryStream ms = new MemoryStream();
- while (true)
- {
- int nextChar = dis.Read();
- if (nextChar < 0)
- {
- throw new IOException("Peer closed the stream.");
- }
- if( nextChar == 10 )
- {
- break;
- }
- ms.WriteByte((byte)nextChar);
+ Tracer.Debug("StompWireFormat - Ignored command: " + o.GetType());
}
- byte[] data = ms.ToArray();
- return encoding.GetString(data, 0, data.Length);
}
- public Object Unmarshal(BinaryReader dis)
+ public Object Unmarshal(BinaryReader dataIn)
{
- string command;
- do {
- command = ReadLine(dis);
- }
- while (command == "");
-
- Tracer.Debug("<<<< command: " + command);
-
- IDictionary headers = new Hashtable();
- string line;
- while ((line = ReadLine(dis)) != "")
- {
- int idx = line.IndexOf(':');
- if (idx > 0)
- {
- string key = line.Substring(0, idx);
- string value = line.Substring(idx + 1);
- headers[key] = value;
-
- Tracer.Debug("<<<< header: " + key + " = " + value);
- }
- else
- {
- // lets ignore this bad header!
- }
- }
- byte[] content = null;
- string length = ToString(headers["content-length"]);
- if (length != null)
- {
- int size = Int32.Parse(length);
- content = dis.ReadBytes(size);
- // Read the terminating NULL byte for this frame.
- int nullByte = dis.Read();
- if(nullByte != 0)
- {
- Tracer.Debug("<<<< error reading frame null byte.");
- }
- }
- else
- {
- MemoryStream ms = new MemoryStream();
- int nextChar;
- while((nextChar = dis.Read()) != 0)
- {
- if( nextChar < 0 )
- {
- // EOF ??
- break;
- }
- ms.WriteByte((byte)nextChar);
- }
- content = ms.ToArray();
- }
- Object answer = CreateCommand(command, headers, content);
- Tracer.Debug("<<<< received: " + answer);
+ StompFrame frame = new StompFrame();
+ frame.FromStream(dataIn);
+
+ Object answer = CreateCommand(frame);
+ Tracer.Debug("StompWireFormat - Command received: " + answer);
return answer;
}
- protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
+ protected virtual Object CreateCommand(StompFrame frame)
{
+ string command = frame.Command;
+
if(command == "RECEIPT" || command == "CONNECTED")
{
- string text = RemoveHeader(headers, "receipt-id");
+ string text = frame.RemoveProperty("receipt-id");
if(text != null)
{
Response answer = new Response();
@@ -198,8 +133,8 @@
}
else if(command == "CONNECTED")
{
- text = RemoveHeader(headers, "response-id");
- if (text != null)
+ text = frame.RemoveProperty("response-id");
+ if(text != null)
{
Response answer = new Response();
answer.CorrelationId = Int32.Parse(text);
@@ -209,7 +144,7 @@
}
else if(command == "ERROR")
{
- string text = RemoveHeader(headers, "receipt-id");
+ string text = frame.RemoveProperty("receipt-id");
if(text != null && text.StartsWith("ignore:"))
{
@@ -226,146 +161,169 @@
}
BrokerError error = new BrokerError();
- error.Message = RemoveHeader(headers, "message");
- error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
- // TODO is this the right header?
+ error.Message = frame.RemoveProperty("message");
answer.Exception = error;
return answer;
}
}
- else if (command == "MESSAGE")
+ else if(command == "MESSAGE")
{
- return ReadMessage(command, headers, content);
+ return CreateMessage(frame);
}
- Tracer.Error("Unknown command: " + command + " headers: " + headers);
+
+ Tracer.Error("Unknown command: " + frame.Command + " headers: " + frame.Properties);
+
return null;
}
- protected virtual Command ReadMessage(string command, IDictionary headers, byte[] content)
+ protected virtual Command CreateMessage(StompFrame frame)
{
Message message = null;
- if (headers.Contains("content-length"))
+ if(frame.HasProperty("content-length"))
{
message = new BytesMessage();
- message.Content = content;
+ message.Content = frame.Content;
}
else
{
- message = new TextMessage(encoding.GetString(content, 0, content.Length));
+ message = new TextMessage(encoder.GetString(frame.Content, 0, frame.Content.Length));
}
- // TODO now lets set the various headers
-
- message.Type = RemoveHeader(headers, "type");
- message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
- message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
- message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
- message.CorrelationId = RemoveHeader(headers, "correlation-id");
- message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
- message.Persistent = StompHelper.ToBool(RemoveHeader(headers, "persistent"), true);
-
- string header = RemoveHeader(headers, "priority");
- if (header != null) message.Priority = Byte.Parse(header);
+ message.Type = frame.RemoveProperty("type");
+ message.Destination = StompHelper.ToDestination(frame.RemoveProperty("destination"));
+ message.ReplyTo = StompHelper.ToDestination(frame.RemoveProperty("reply-to"));
+ message.TargetConsumerId = StompHelper.ToConsumerId(frame.RemoveProperty("subscription"));
+ message.CorrelationId = frame.RemoveProperty("correlation-id");
+ message.MessageId = StompHelper.ToMessageId(frame.RemoveProperty("message-id"));
+ message.Persistent = StompHelper.ToBool(frame.RemoveProperty("persistent"), true);
- header = RemoveHeader(headers, "timestamp");
- if (header != null) message.Timestamp = Int64.Parse(header);
-
- header = RemoveHeader(headers, "expires");
- if (header != null) message.Expiration = Int64.Parse(header);
+ if(frame.HasProperty("priority"))
+ {
+ message.Priority = Byte.Parse(frame.RemoveProperty("priority"));
+ }
+
+ if(frame.HasProperty("timestamp"))
+ {
+ message.Timestamp = Int64.Parse(frame.RemoveProperty("timestamp"));
+ }
+
+ if(frame.HasProperty("expires"))
+ {
+ message.Expiration = Int64.Parse(frame.RemoveProperty("expires"));
+ }
// now lets add the generic headers
- foreach (string key in headers.Keys)
+ foreach(string key in frame.Properties.Keys)
{
- Object value = headers[key];
- if (value != null)
+ Object value = frame.Properties[key];
+ if(value != null)
{
// lets coerce some standard header extensions
- if (key == "NMSXGroupSeq")
+ if(key == "NMSXGroupSeq")
{
value = Int32.Parse(value.ToString());
}
}
message.Properties[key] = value;
}
+
MessageDispatch dispatch = new MessageDispatch();
dispatch.Message = message;
dispatch.ConsumerId = message.TargetConsumerId;
dispatch.Destination = message.Destination;
+
return dispatch;
}
- protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
+ protected virtual void WriteConnectionInfo(ConnectionInfo command, BinaryWriter dataOut)
{
- // lets force a receipt
- command.ResponseRequired = true;
+ // lets force a receipt for the Connect Frame.
+
+ StompFrame frame = new StompFrame("CONNECT");
- ss.WriteCommand(command, "CONNECT");
- ss.WriteHeader("client-id", command.ClientId);
- ss.WriteHeader("login", command.UserName);
- ss.WriteHeader("passcode", command.Password);
-
- if (command.ResponseRequired)
- {
- ss.WriteHeader("request-id", command.CommandId);
- }
+ frame.SetProperty("client-id", command.ClientId);
+ frame.SetProperty("login", command.UserName);
+ frame.SetProperty("passcode", command.Password);
+ frame.SetProperty("request-id", command.CommandId.ToString());
- ss.Flush();
+ frame.ToStream(dataOut);
}
- protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
+ protected virtual void WriteShutdownInfo(ShutdownInfo command, BinaryWriter dataOut)
{
- ss.WriteCommand(command, "DISCONNECT");
System.Diagnostics.Debug.Assert(!command.ResponseRequired);
- ss.Flush();
+
+ new StompFrame("DISCONNECT").ToStream(dataOut);
}
- protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
+ protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut)
{
- ss.WriteCommand(command, "SUBSCRIBE");
- ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
- ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
- ss.WriteHeader("selector", command.Selector);
- if ( command.NoLocal )
- ss.WriteHeader("no-local", command.NoLocal);
- ss.WriteHeader("ack", "client");
+ StompFrame frame = new StompFrame("SUBSCRIBE");
+
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", command.CommandId);
+ }
+
+ frame.SetProperty("destination", StompHelper.ToStomp(command.Destination));
+ frame.SetProperty("id", StompHelper.ToStomp(command.ConsumerId));
+ frame.SetProperty("durable-subscriber-name", command.SubscriptionName);
+ frame.SetProperty("selector", command.Selector);
+ frame.SetProperty("ack", StompHelper.ToStomp(command.AckMode));
+
+ if(command.NoLocal)
+ {
+ frame.SetProperty("no-local", command.NoLocal.ToString());
+ }
// ActiveMQ extensions to STOMP
- ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
- if ( command.Exclusive )
- ss.WriteHeader("activemq.exclusive", command.Exclusive);
+ frame.SetProperty("activemq.dispatchAsync", command.DispatchAsync);
+
+ if(command.Exclusive)
+ {
+ frame.SetProperty("activemq.exclusive", command.Exclusive);
+ }
- if( command.SubscriptionName != null )
+ if(command.SubscriptionName != null)
{
- ss.WriteHeader("activemq.subscriptionName", command.SubscriptionName);
+ frame.SetProperty("activemq.subscriptionName", command.SubscriptionName);
// For an older 4.0 broker we need to set this header so they get the
- // subscription as wel..
- ss.WriteHeader("activemq.subcriptionName", command.SubscriptionName);
+ // subscription as well..
+ frame.SetProperty("activemq.subcriptionName", command.SubscriptionName);
}
- ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
- ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
- ss.WriteHeader("activemq.priority", command.Priority);
- if ( command.Retroactive )
- ss.WriteHeader("activemq.retroactive", command.Retroactive);
+ frame.SetProperty("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
+ frame.SetProperty("activemq.prefetchSize", command.PrefetchSize);
+ frame.SetProperty("activemq.priority", command.Priority);
+
+ if(command.Retroactive)
+ {
+ frame.SetProperty("activemq.retroactive", command.Retroactive);
+ }
+ // TODO - The Session should do this one.
consumers[command.ConsumerId] = command.ConsumerId;
- ss.Flush();
+
+ frame.ToStream(dataOut);
}
- protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
+ protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
{
+ StompFrame frame = new StompFrame("UNSUBSCRIBE");
object id = command.ObjectId;
- if (id is ConsumerId)
+ if(id is ConsumerId)
{
ConsumerId consumerId = id as ConsumerId;
- ss.WriteCommand(command, "UNSUBSCRIBE");
- ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
- ss.Flush();
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", command.CommandId);
+ }
+ frame.SetProperty("id", StompHelper.ToStomp(consumerId));
+ frame.ToStream(dataOut);
consumers.Remove(consumerId);
}
- else if (id is SessionId)
+ else if(id is SessionId)
{
// When a session is removed, it needs to remove it's consumers too.
// Find all the consumer that were part of the session.
@@ -383,134 +341,149 @@
bool unsubscribedConsumer = false;
// Un-subscribe them.
- foreach (ConsumerId consumerId in matches)
+ foreach(ConsumerId consumerId in matches)
{
- ss.WriteCommand(command, "UNSUBSCRIBE");
- ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
- ss.Flush();
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", command.CommandId);
+ }
+ frame.SetProperty("id", StompHelper.ToStomp(consumerId));
+ frame.ToStream(dataOut);
consumers.Remove(consumerId);
unsubscribedConsumer = true;
}
if(!unsubscribedConsumer && command.ResponseRequired)
{
- ss.WriteCommand(command, "UNSUBSCRIBE", true);
- ss.WriteHeader("id", sessionId);
- ss.Flush();
- }
- }
- else if(id is ProducerId)
- {
- if(command.ResponseRequired)
- {
- ss.WriteCommand(command, "UNSUBSCRIBE", true);
- ss.WriteHeader("id", id);
- ss.Flush();
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", "ignore:" + command.CommandId);
+ }
+ frame.SetProperty("id", sessionId);
+ frame.ToStream(dataOut);
}
}
else if(id is ConnectionId)
{
if(command.ResponseRequired)
{
- ss.WriteCommand(command, "UNSUBSCRIBE", true);
- ss.WriteHeader("id", id);
- ss.Flush();
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", "ignore:" + command.CommandId);
+ }
+ frame.SetProperty("id", id);
+ frame.ToStream(dataOut);
}
}
}
-
- protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream ss)
+ protected virtual void WriteTransactionInfo(TransactionInfo command, BinaryWriter dataOut)
{
- TransactionId id = command.TransactionId;
- if (id is TransactionId)
+ string type = "BEGIN";
+ TransactionType transactionType = (TransactionType) command.Type;
+ switch(transactionType)
{
- string type = "BEGIN";
- TransactionType transactionType = (TransactionType) command.Type;
- switch (transactionType)
- {
- case TransactionType.Commit:
- command.ResponseRequired = true;
- type = "COMMIT";
- break;
- case TransactionType.Rollback:
- command.ResponseRequired = true;
- type = "ABORT";
- break;
- }
-
- Tracer.Debug(">>> For transaction type: " + transactionType + " we are using command type: " + type);
- ss.WriteCommand(command, type);
- ss.WriteHeader("transaction", StompHelper.ToStomp(id));
- ss.Flush();
+ case TransactionType.Commit:
+ command.ResponseRequired = true;
+ type = "COMMIT";
+ break;
+ case TransactionType.Rollback:
+ command.ResponseRequired = true;
+ type = "ABORT";
+ break;
}
- }
- protected virtual void WriteMessage(Message command, StompFrameStream ss)
- {
- ss.WriteCommand(command, "SEND");
- ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
- if (command.ReplyTo != null)
- ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
- if (command.CorrelationId != null )
- ss.WriteHeader("correlation-id", command.CorrelationId);
- if (command.Expiration != 0)
- ss.WriteHeader("expires", command.Expiration);
- if (command.Priority != 4)
- ss.WriteHeader("priority", command.Priority);
- if (command.Type != null)
- ss.WriteHeader("type", command.Type);
- if (command.TransactionId!=null)
- ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
-
- ss.WriteHeader("persistent", command.Persistent);
+ Tracer.Debug("StompWireFormat - For transaction type: " + transactionType +
+ " we are using command type: " + type);
- // lets force the content to be marshalled
+ StompFrame frame = new StompFrame(type);
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", command.CommandId);
+ }
+
+ frame.SetProperty("transaction", StompHelper.ToStomp(command.TransactionId));
+ frame.ToStream(dataOut);
+ }
- command.BeforeMarshall(null);
- if (command is TextMessage)
+ protected virtual void WriteMessage(Message command, BinaryWriter dataOut)
+ {
+ StompFrame frame = new StompFrame("SEND");
+ if(command.ResponseRequired)
{
- TextMessage textMessage = command as TextMessage;
- ss.Content = encoding.GetBytes(textMessage.Text);
+ frame.SetProperty("receipt", command.CommandId);
}
- else
+
+ frame.SetProperty("destination", StompHelper.ToStomp(command.Destination));
+
+ if(command.ReplyTo != null)
{
- ss.Content = command.Content;
- if(null != command.Content)
- {
- ss.ContentLength = command.Content.Length;
- }
- else
- {
- ss.ContentLength = 0;
- }
+ frame.SetProperty("reply-to", StompHelper.ToStomp(command.ReplyTo));
+ }
+ if(command.CorrelationId != null )
+ {
+ frame.SetProperty("correlation-id", command.CorrelationId);
+ }
+ if(command.Expiration != 0)
+ {
+ frame.SetProperty("expires", command.Expiration);
+ }
+ if(command.Priority != 4)
+ {
+ frame.SetProperty("priority", command.Priority);
+ }
+ if(command.Type != null)
+ {
+ frame.SetProperty("type", command.Type);
+ }
+ if(command.TransactionId!=null)
+ {
+ frame.SetProperty("transaction", StompHelper.ToStomp(command.TransactionId));
}
+ frame.SetProperty("persistent", command.Persistent);
+
+ // Perform any Content Marshaling.
+ command.BeforeMarshall(this);
+
+ // Store the Marshaled Content.
+ frame.Content = command.Content;
+
+ if(command is BytesMessage && command.Content != null && command.Content.Length > 0)
+ {
+ frame.SetProperty("content-length", command.Content.Length);
+ }
+
+ // Marshal all properties to the Frame.
IPrimitiveMap map = command.Properties;
- foreach (string key in map.Keys)
+ foreach(string key in map.Keys)
{
- ss.WriteHeader(key, map[key]);
+ frame.SetProperty(key, map[key]);
}
- ss.Flush();
+
+ frame.ToStream(dataOut);
}
- protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
+ protected virtual void WriteMessageAck(MessageAck command, BinaryWriter dataOut)
{
- ss.WriteCommand(command, "ACK", true);
+ StompFrame frame = new StompFrame("SEND");
+ if(command.ResponseRequired)
+ {
+ frame.SetProperty("receipt", "ignore:" + command.CommandId);
+ }
- // TODO handle bulk ACKs?
- ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+ frame.SetProperty("message-id", StompHelper.ToStomp(command.LastMessageId));
if(command.TransactionId != null)
{
- ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+ frame.SetProperty("transaction", StompHelper.ToStomp(command.TransactionId));
}
- ss.Flush();
+ frame.ToStream(dataOut);
}
protected virtual void SendCommand(Command command)
{
- if (transport == null)
+ if(transport == null)
{
Tracer.Fatal("No transport configured so cannot return command: " + command);
}
@@ -520,24 +493,9 @@
}
}
- protected virtual string RemoveHeader(IDictionary headers, string name)
- {
- object value = headers[name];
- if (value == null)
- {
- return null;
- }
- else
- {
- headers.Remove(name);
- return value.ToString();
- }
- }
-
-
protected virtual string ToString(object value)
{
- if (value != null)
+ if(value != null)
{
return value.ToString();
}
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Wed Dec 9 15:55:13 2009
@@ -50,7 +50,6 @@
private int consumerCounter;
private int producerCounter;
private long nextDeliveryId;
- private long lastDeliveredSequenceId;
private bool disposed = false;
private bool closed = false;
private bool closing = false;
@@ -260,7 +259,6 @@
// Make sure we attempt to inform the broker this Session is done.
RemoveInfo info = new RemoveInfo();
info.ObjectId = this.info.SessionId;
- info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
this.connection.Oneway(info);
this.connection = null;
this.closed = true;
@@ -290,8 +288,6 @@
foreach(MessageConsumer consumer in consumers.Values)
{
consumer.DoClose();
- this.lastDeliveredSequenceId =
- Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
}
}
consumers.Clear();
@@ -358,9 +354,6 @@
throw;
}
- // Registered with Connection so it can process Producer Acks.
- connection.addProducer(producerId, producer);
-
return producer;
}
@@ -626,11 +619,9 @@
}
}
- public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId)
+ public void DisposeOf(ConsumerId objectId)
{
connection.removeDispatcher(objectId);
- this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
-
if(!this.closing)
{
consumers.Remove(objectId);
@@ -639,7 +630,6 @@
public void DisposeOf(ProducerId objectId)
{
- connection.removeProducer(objectId);
if(!this.closing)
{
producers.Remove(objectId);
@@ -691,7 +681,6 @@
id.Value = Interlocked.Increment(ref producerCounter);
answer.ProducerId = id;
answer.Destination = Destination.Transform(destination);
- answer.WindowSize = connection.ProducerWindowSize;
// If the destination contained a URI query, then use it to set public
// properties on the ProducerInfo
Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj?rev=888848&r1=888847&r2=888848&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp.csproj Wed Dec 9 15:55:13 2009
@@ -8,6 +8,7 @@
<ProjectGuid>{AA51947C-1370-44DC-8692-1C8EFC5945F7}</ProjectGuid>
<OutputType>Library</OutputType>
<AssemblyName>Apache.NMS.Stomp</AssemblyName>
+ <RootNamespace>Apache.NMS.Stomp</RootNamespace>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -109,6 +110,7 @@
<Compile Include="src\main\csharp\ConnectionClosedException.cs" />
<Compile Include="src\main\csharp\Commands\ConnectionError.cs" />
<Compile Include="src\main\csharp\Commands\RemoveSubscriptionInfo.cs" />
+ <Compile Include="src\main\csharp\Protocol\StompFrame.cs" />
</ItemGroup>
<ItemGroup>
<None Include="keyfile\NMSKey.snk" />
@@ -120,6 +122,7 @@
<Properties>
<Policies>
<TextStylePolicy FileWidth="120" TabWidth="4" TabsToSpaces="True" NoTabsAfterNonTabs="False" RemoveTrailingWhitespace="True" EolMarker="Native" />
+ <StandardHeader Text="/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
" inheritsSet="MITX11License" />
</Policies>
</Properties>
</MonoDevelop>