You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by to...@apache.org on 2007/05/18 02:51:15 UTC
svn commit: r539198 [2/2] - in /incubator/qpid/trunk/qpid: ./
dotnet/Qpid.Buffer.Tests/ dotnet/Qpid.Buffer/ dotnet/Qpid.Client.Tests/
dotnet/Qpid.Client.Tests/BrokerDetails/ dotnet/Qpid.Client.Tests/Channel/
dotnet/Qpid.Client.Tests/Messages/ dotnet/Qp...
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs Thu May 17 17:51:12 2007
@@ -25,93 +25,104 @@
namespace Qpid.Client.Message
{
- public class MessageFactoryRegistry
- {
- private readonly Hashtable _mimeToFactoryMap = new Hashtable();
-
- public void RegisterFactory(string mimeType, IMessageFactory mf)
- {
- if (mf == null)
- {
- throw new ArgumentNullException("Message factory");
- }
- if (mimeType == null)
- {
- throw new ArgumentNullException("mf");
- }
- _mimeToFactoryMap[mimeType] = mf;
- }
-
- public void DeregisterFactory(string mimeType)
- {
- _mimeToFactoryMap.Remove(mimeType);
- }
-
- /// <summary>
- /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
- /// concrete message type.
- /// </summary>
- /// <param name="messageNbr">the AMQ message id</param>
- /// <param name="redelivered">true if redelivered</param>
- /// <param name="contentHeader">the content header that was received</param>
- /// <param name="bodies">a list of ContentBody instances</param>
- /// <returns>the message.</returns>
- /// <exception cref="AMQException"/>
- /// <exception cref="QpidException"/>
- public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
- ContentHeaderBody contentHeader,
- IList bodies)
- {
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.Properties;
-
- if (properties.ContentType == null)
- {
- properties.ContentType = "";
- }
-
- IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[properties.ContentType];
- if (mf == null)
- {
- throw new AMQException("Unsupport MIME type of " + properties.ContentType);
- }
- else
- {
- return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
- }
- }
-
- public AbstractQmsMessage CreateMessage(string mimeType)
- {
- if (mimeType == null)
- {
- throw new ArgumentNullException("Mime type must not be null");
- }
- IMessageFactory mf = (IMessageFactory) _mimeToFactoryMap[mimeType];
- if (mf == null)
- {
- throw new AMQException("Unsupport MIME type of " + mimeType);
- }
- else
- {
- return mf.CreateMessage();
- }
- }
-
- /// <summary>
- /// Construct a new registry with the default message factories registered
- /// </summary>
- /// <returns>a message factory registry</returns>
- public static MessageFactoryRegistry NewDefaultRegistry()
- {
- MessageFactoryRegistry mf = new MessageFactoryRegistry();
- mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
- mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
- mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
- // TODO: use bytes message for default message factory
- // MJA - just added this bit back in...
- mf.RegisterFactory("", new QpidBytesMessageFactory());
- return mf;
- }
- }
+ public class MessageFactoryRegistry
+ {
+ private readonly Hashtable _mimeToFactoryMap = new Hashtable();
+ private IMessageFactory _defaultFactory;
+
+ /// <summary>
+ /// Default factory to use for unknown message types
+ /// </summary>
+ public IMessageFactory DefaultFactory
+ {
+ get { return _defaultFactory; }
+ set { _defaultFactory = value; }
+ }
+
+ /// <summary>
+ /// Register a new message factory for a MIME type
+ /// </summary>
+ /// <param name="mimeType">Mime type to register</param>
+ /// <param name="mf"></param>
+ public void RegisterFactory(string mimeType, IMessageFactory mf)
+ {
+ if ( mf == null )
+ throw new ArgumentNullException("mf");
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
+
+ _mimeToFactoryMap[mimeType] = mf;
+ }
+
+ /// <summary>
+ /// Remove a message factory
+ /// </summary>
+ /// <param name="mimeType">MIME type to unregister</param>
+ public void DeregisterFactory(string mimeType)
+ {
+ _mimeToFactoryMap.Remove(mimeType);
+ }
+
+ /// <summary>
+ /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ /// concrete message type.
+ /// </summary>
+ /// <param name="messageNbr">the AMQ message id</param>
+ /// <param name="redelivered">true if redelivered</param>
+ /// <param name="contentHeader">the content header that was received</param>
+ /// <param name="bodies">a list of ContentBody instances</param>
+ /// <returns>the message.</returns>
+ /// <exception cref="AMQException"/>
+ /// <exception cref="QpidException"/>
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered,
+ ContentHeaderBody contentHeader,
+ IList bodies)
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties;
+
+ if ( properties.ContentType == null )
+ {
+ properties.ContentType = "";
+ }
+
+ IMessageFactory mf = GetFactory(properties.ContentType);
+ return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies);
+ }
+
+ /// <summary>
+ /// Create a new message of the specified type
+ /// </summary>
+ /// <param name="mimeType">The Mime type</param>
+ /// <returns>The new message</returns>
+ public AbstractQmsMessage CreateMessage(string mimeType)
+ {
+ if ( mimeType == null || mimeType.Length == 0 )
+ throw new ArgumentNullException("mimeType");
+
+ IMessageFactory mf = GetFactory(mimeType);
+ return mf.CreateMessage(mimeType);
+ }
+
+ /// <summary>
+ /// Construct a new registry with the default message factories registered
+ /// </summary>
+ /// <returns>a message factory registry</returns>
+ public static MessageFactoryRegistry NewDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.RegisterFactory("text/plain", new QpidTextMessageFactory());
+ mf.RegisterFactory("text/xml", new QpidTextMessageFactory());
+ mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory());
+
+ mf.DefaultFactory = new QpidBytesMessageFactory();
+ return mf;
+ }
+
+ private IMessageFactory GetFactory(string mimeType)
+ {
+ IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType];
+ return mf != null ? mf : _defaultFactory;
+ }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs Thu May 17 17:51:12 2007
@@ -43,8 +43,6 @@
public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage
{
- private const string MIME_TYPE = "application/octet-stream";
-
private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
public QpidBytesMessage() : this(null)
@@ -59,7 +57,6 @@
QpidBytesMessage(ByteBuffer data) : base(data)
{
// superclass constructor has instantiated a content header at this point
- ContentHeaderProperties.ContentType = MIME_TYPE;
if (data == null)
{
_data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE);
@@ -71,7 +68,6 @@
// TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
: base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data)
{
- ContentHeaderProperties.ContentType = MIME_TYPE;
}
public override void ClearBodyImpl()
@@ -113,14 +109,6 @@
byte[] data = new byte[_data.Remaining];
_data.GetBytes(data);
return Encoding.UTF8.GetString(data);
- }
- }
-
- public override string MimeType
- {
- get
- {
- return MIME_TYPE;
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs Thu May 17 17:51:12 2007
@@ -62,9 +62,11 @@
return new QpidBytesMessage(deliveryTag, contentHeader, data);
}
- public override AbstractQmsMessage CreateMessage()
+ public override AbstractQmsMessage CreateMessage(string mimeType)
{
- return new QpidBytesMessage();
+ QpidBytesMessage msg = new QpidBytesMessage();
+ msg.ContentType = mimeType;
+ return msg;
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs Thu May 17 17:51:12 2007
@@ -28,16 +28,10 @@
_headers.Clear();
}
- public string this[string name]
+ public object this[string name]
{
- get
- {
- return GetString(name);
- }
- set
- {
- SetString(name, value);
- }
+ get { return GetObject(name); }
+ set { SetObject(name, value); }
}
public bool GetBoolean(string name)
@@ -165,6 +159,18 @@
{
CheckPropertyName(propertyName);
_headers.SetString(propertyName, value);
+ }
+
+ public object GetObject(string propertyName)
+ {
+ CheckPropertyName(propertyName);
+ return _headers[propertyName];
+ }
+
+ public void SetObject(string propertyName, object value)
+ {
+ CheckPropertyName(propertyName);
+ _headers[propertyName] = value;
}
private static void CheckPropertyName(string propertyName)
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs Thu May 17 17:51:12 2007
@@ -28,25 +28,22 @@
{
public class QpidTextMessage : AbstractQmsMessage, ITextMessage
{
- private const string MIME_TYPE = "text/plain";
-
private string _decodedValue = null;
+ private static Encoding DEFAULT_ENCODING = Encoding.UTF8;
internal QpidTextMessage() : this(null, null)
{
+ ContentEncoding = DEFAULT_ENCODING.BodyName;
}
- QpidTextMessage(ByteBuffer data, String encoding) : base(data)
+ internal QpidTextMessage(ByteBuffer data, String encoding) : base(data)
{
- ContentHeaderProperties.ContentType = MIME_TYPE;
- ContentHeaderProperties.Encoding = encoding;
+ ContentEncoding = encoding;
}
internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
:base(deliveryTag, contentHeader, data)
{
- contentHeader.ContentType = MIME_TYPE;
- _data = data; // FIXME: Unnecessary - done in base class ctor.
}
public override void ClearBodyImpl()
@@ -64,14 +61,6 @@
return Text;
}
- public override string MimeType
- {
- get
- {
- return MIME_TYPE;
- }
- }
-
public string Text
{
get
@@ -100,7 +89,7 @@
}
else
{
- _decodedValue = Encoding.Default.GetString(bytes);
+ _decodedValue = DEFAULT_ENCODING.GetString(bytes);
}
return _decodedValue;
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs Thu May 17 17:51:12 2007
@@ -25,9 +25,11 @@
{
public class QpidTextMessageFactory : AbstractQmsMessageFactory
{
- public override AbstractQmsMessage CreateMessage()
+ public override AbstractQmsMessage CreateMessage(string mimeType)
{
- return new QpidTextMessage();
+ QpidTextMessage msg = new QpidTextMessage();
+ msg.ContentType = mimeType;
+ return msg;
}
protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader)
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs Thu May 17 17:51:12 2007
@@ -43,7 +43,7 @@
Bodies.Add(body);
if (body.Payload != null)
{
- _bytesReceived += (uint)body.Payload.Length;
+ _bytesReceived += (uint)body.Payload.Remaining;
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/QpidConnectionInfo.cs Thu May 17 17:51:12 2007
@@ -171,19 +171,10 @@
sb.Append('?');
foreach (String key in options.Keys)
{
- sb.Append(key);
-
- sb.Append("='");
-
- sb.Append(options[key]);
-
- sb.Append("'");
- sb.Append(DEFAULT_OPTION_SEPERATOR);
+ sb.AppendFormat("{0}='{1}'{2}", key, options[key], DEFAULT_OPTION_SEPERATOR);
}
sb.Remove(sb.Length - 1, 1);
- // sb.deleteCharAt(sb.length() - 1);
-
return sb.ToString();
}
}
@@ -358,9 +349,10 @@
public class QpidConnectionInfo : IConnectionInfo
{
+ const string DEFAULT_VHOST = "/";
string _username = "guest";
string _password = "guest";
- string _virtualHost = "/";
+ string _virtualHost = DEFAULT_VHOST;
string _failoverMethod = null;
IDictionary _failoverOptions = new Hashtable();
@@ -385,15 +377,51 @@
public string AsUrl()
{
- string result = "amqp://";
- foreach (IBrokerInfo info in _brokerInfos)
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("{0}://", ConnectionUrlConstants.AMQ_PROTOCOL);
+
+ if (_username != null)
{
- result += info.ToString();
+ sb.Append(_username);
+ if (_password != null)
+ {
+ sb.AppendFormat(":{0}", _password);
+ }
+ sb.Append("@");
}
- return result;
+ sb.Append(_clientName);
+ sb.Append(_virtualHost);
+ sb.Append(OptionsToString());
+
+ return sb.ToString();
}
+ private String OptionsToString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.AppendFormat("?{0}='", ConnectionUrlConstants.OPTIONS_BROKERLIST);
+
+ foreach (IBrokerInfo broker in _brokerInfos)
+ {
+ sb.AppendFormat("{0};", broker);
+ }
+
+ sb.Remove(sb.Length - 1, 1);
+ sb.Append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.AppendFormat("{0}{1}='{2}{3}'", URLHelper.DEFAULT_OPTION_SEPERATOR,
+ ConnectionUrlConstants.OPTIONS_FAILOVER,
+ _failoverMethod,
+ URLHelper.printOptions((Hashtable)_failoverOptions));
+ }
+
+ return sb.ToString();
+ }
+
+
public string FailoverMethod
{
get { return _failoverMethod; }
@@ -449,7 +477,13 @@
public string VirtualHost
{
get { return _virtualHost; }
- set { _virtualHost = value; }
+ set {
+ _virtualHost = value;
+ if ( _virtualHost == null || _virtualHost.Length == 0 )
+ _virtualHost = DEFAULT_VHOST;
+ if ( _virtualHost[0] != '/' )
+ _virtualHost = '/' + _virtualHost;
+ }
}
public string GetOption(string key)
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/State/AMQStateManager.cs Thu May 17 17:51:12 2007
@@ -65,6 +65,8 @@
IStateAwareMethodListener channelClose = new ChannelCloseMethodHandler();
IStateAwareMethodListener basicDeliver = new BasicDeliverMethodHandler();
IStateAwareMethodListener basicReturn = new BasicReturnMethodHandler();
+ IStateAwareMethodListener queueDeleteOk = new QueueDeleteOkMethodHandler();
+ IStateAwareMethodListener queuePurgeOk = new QueuePurgeOkMethodHandler();
// We need to register a map for the null (i.e. all state) handlers otherwise you get
// a stack overflow in the handler searching code when you present it with a frame for which
@@ -96,6 +98,8 @@
open[typeof(ConnectionCloseBody)] = connectionClose;
open[typeof(BasicDeliverBody)] = basicDeliver;
open[typeof(BasicReturnBody)] = basicReturn;
+ open[typeof(QueueDeleteOkBody)] = queueDeleteOk;
+ open[typeof(QueuePurgeOkBody)] = queuePurgeOk;
_state2HandlersMap[AMQState.CONNECTION_OPEN] = open;
}
{
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/AmqpChannel.cs Thu May 17 17:51:12 2007
@@ -33,35 +33,39 @@
// Warning: don't use this log for regular logging.
static readonly ILog _protocolTraceLog = LogManager.GetLogger("Qpid.Client.ProtocolChannel.Tracing");
- IByteChannel byteChannel;
- IProtocolEncoder encoder;
- IProtocolDecoder decoder;
-
- public AmqpChannel(IByteChannel byteChannel)
- {
- this.byteChannel = byteChannel;
+ IByteChannel _byteChannel;
+ IProtocolEncoder _encoder;
+ IProtocolDecoder _decoder;
+ IProtocolDecoderOutput _decoderOutput;
+ private object _syncLock;
+
+ public AmqpChannel(IByteChannel byteChannel, IProtocolDecoderOutput decoderOutput)
+ {
+ _byteChannel = byteChannel;
+ _decoderOutput = decoderOutput;
+ _syncLock = new object();
AMQProtocolProvider protocolProvider = new AMQProtocolProvider();
IProtocolCodecFactory factory = protocolProvider.CodecFactory;
- encoder = factory.Encoder;
- decoder = factory.Decoder;
+ _encoder = factory.Encoder;
+ _decoder = factory.Decoder;
}
- public Queue Read()
+ public void Read()
{
- ByteBuffer buffer = byteChannel.Read();
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.Read();
+ Decode(buffer);
}
public IAsyncResult BeginRead(AsyncCallback callback, object state)
{
- return byteChannel.BeginRead(callback, state);
+ return _byteChannel.BeginRead(callback, state);
}
- public Queue EndRead(IAsyncResult result)
+ public void EndRead(IAsyncResult result)
{
- ByteBuffer buffer = byteChannel.EndRead(result);
- return DecodeAndTrace(buffer);
+ ByteBuffer buffer = _byteChannel.EndRead(result);
+ Decode(buffer);
}
public void Write(IDataBlock o)
@@ -74,43 +78,32 @@
// we should be doing an async write, but apparently
// the mentalis library doesn't queue async read/writes
// correctly and throws random IOException's. Stay sync for a while
- //byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
- byteChannel.Write(Encode(o));
+ //_byteChannel.BeginWrite(Encode(o), OnAsyncWriteDone, null);
+ _byteChannel.Write(Encode(o));
}
private void OnAsyncWriteDone(IAsyncResult result)
{
- byteChannel.EndWrite(result);
+ _byteChannel.EndWrite(result);
}
- private Queue DecodeAndTrace(ByteBuffer buffer)
+ private void Decode(ByteBuffer buffer)
{
- Queue frames = Decode(buffer);
-
- // TODO: Refactor to decorator.
- if ( _protocolTraceLog.IsDebugEnabled )
+ // make sure we don't try to decode more than
+ // one buffer at the same time
+ lock ( _syncLock )
{
- foreach ( object o in frames )
- {
- _protocolTraceLog.Debug(String.Format("READ {0}", o));
- }
+ _decoder.Decode(buffer, _decoderOutput);
}
- return frames;
}
private ByteBuffer Encode(object o)
{
SingleProtocolEncoderOutput output = new SingleProtocolEncoderOutput();
- encoder.Encode(o, output);
+ _encoder.Encode(o, output);
return output.buffer;
}
- private Queue Decode(ByteBuffer byteBuffer)
- {
- SimpleProtocolDecoderOutput outx = new SimpleProtocolDecoderOutput();
- decoder.Decode(byteBuffer, outx);
- return outx.MessageQueue;
- }
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/IProtocolChannel.cs Thu May 17 17:51:12 2007
@@ -25,8 +25,8 @@
{
public interface IProtocolChannel : IProtocolWriter
{
- Queue Read();
+ void Read();
IAsyncResult BeginRead(AsyncCallback callback, object state);
- Queue EndRead(IAsyncResult result);
+ void EndRead(IAsyncResult result);
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Transport/Socket/Blocking/BlockingSocketTransport.cs Thu May 17 17:51:12 2007
@@ -24,6 +24,7 @@
using System.Threading;
using Qpid.Client.Qms;
using Qpid.Client.Protocol;
+using Qpid.Codec;
using Qpid.Framing;
namespace Qpid.Client.Transport.Socket.Blocking
@@ -66,7 +67,11 @@
_ioHandler = MakeBrokerConnection(broker, connection);
// todo: get default read size from config!
- _amqpChannel = new AmqpChannel(new ByteChannel(_ioHandler));
+ IProtocolDecoderOutput decoderOutput =
+ new ProtocolDecoderOutput(_protocolListener);
+ _amqpChannel =
+ new AmqpChannel(new ByteChannel(_ioHandler), decoderOutput);
+
// post an initial async read
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), this);
}
@@ -117,22 +122,28 @@
{
try
{
- Queue frames = _amqpChannel.EndRead(result);
+ _amqpChannel.EndRead(result);
- // process results
- foreach ( IDataBlock dataBlock in frames )
- {
- _protocolListener.OnMessage(dataBlock);
- }
- // if we're not stopping, post a read again
bool stopping = _stopEvent.WaitOne(0, false);
if ( !stopping )
_amqpChannel.BeginRead(new AsyncCallback(OnAsyncReadDone), null);
} catch ( Exception e )
{
- _protocolListener.OnException(e);
+ // ignore any errors during closing
+ bool stopping = _stopEvent.WaitOne(0, false);
+ if ( !stopping )
+ _protocolListener.OnException(e);
}
}
+
+ #region IProtocolDecoderOutput Members
+
+ public void Write(object message)
+ {
+ _protocolListener.OnMessage((IDataBlock)message);
+ }
+
+ #endregion
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Qpid.Client.csproj Thu May 17 17:51:12 2007
@@ -50,6 +50,8 @@
<Compile Include="Client\AMQNoConsumersException.cs" />
<Compile Include="Client\AMQNoRouteException.cs" />
<Compile Include="Client\Configuration\AuthenticationConfigurationSectionHandler.cs" />
+ <Compile Include="Client\Handler\QueueDeleteOkMethodHandler.cs" />
+ <Compile Include="Client\Handler\QueuePurgeOkMethodHandler.cs" />
<Compile Include="Client\SslOptions.cs" />
<Compile Include="Client\Message\QpidHeaders.cs" />
<Compile Include="Client\QpidConnectionInfo.cs" />
@@ -110,6 +112,7 @@
<Compile Include="Client\Transport\IProtocolChannel.cs" />
<Compile Include="Client\Transport\IProtocolWriter.cs" />
<Compile Include="Client\Transport\ITransport.cs" />
+ <Compile Include="Client\Transport\ProtocolDecoderOutput.cs" />
<Compile Include="Client\Transport\SingleProtocolEncoderOutput.cs" />
<Compile Include="Client\Transport\Socket\Blocking\BlockingSocketTransport.cs" />
<Compile Include="Client\Transport\Socket\Blocking\ByteChannel.cs" />
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Codec/CumulativeProtocolDecoder.cs Thu May 17 17:51:12 2007
@@ -19,121 +19,134 @@
*
*/
using System;
+using log4net;
using Qpid.Buffer;
namespace Qpid.Codec
{
- public abstract class CumulativeProtocolDecoder : IProtocolDecoder
- {
- ByteBuffer _remaining;
-
- /// <summary>
- /// Creates a new instance with the 4096 bytes initial capacity of
- /// cumulative buffer.
- /// </summary>
- protected CumulativeProtocolDecoder()
- {
- _remaining = ByteBuffer.Allocate(4096);
- _remaining.IsAutoExpand = true;
- }
-
- /// <summary>
- /// Cumulates content of <tt>in</tt> into internal buffer and forwards
- /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- /// and the cumulative buffer is compacted after decoding ends.
- /// </summary>
- /// <exception cref="Exception">
- /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
- /// </exception>
- public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
- {
- if (_remaining.Position != 0) // If there were remaining undecoded bytes
- {
- DecodeRemainingAndInput(input, output);
- }
- else
- {
- DecodeInput(input, output);
- }
- }
-
- private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
- {
- // Just decode the input buffer and remember any remaining undecoded bytes.
- try
- {
- DecodeAll(input, output);
- }
- finally
- {
- if (input.HasRemaining)
- {
- _remaining.Put(input);
- }
- }
- }
-
- private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
- {
- // Concatenate input buffer with left-over bytes.
- _remaining.Put(input);
- _remaining.Flip();
-
- try
- {
- DecodeAll(_remaining, output);
- }
- finally
- {
- _remaining.Compact();
- }
- }
-
- private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
- {
- for (;;)
- {
- int oldPos = buf.Position;
- bool decoded = DoDecode(buf, output);
- if (decoded)
- {
- if (buf.Position == oldPos)
- {
- throw new Exception(
- "doDecode() can't return true when buffer is not consumed.");
- }
-
- if (!buf.HasRemaining)
- {
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
-
- /// <summary>
- /// Implement this method to consume the specified cumulative buffer and
- /// decode its content into message(s).
- /// </summary>
- /// <param name="input">the cumulative buffer</param>
- /// <param name="output">decoder output</param>
- /// <returns>
- /// <tt>true</tt> if and only if there's more to decode in the buffer
- /// and you want to have <tt>doDecode</tt> method invoked again.
- /// Return <tt>false</tt> if remaining data is not enough to decode,
- /// then this method will be invoked again when more data is cumulated.
- /// </returns>
- /// <exception cref="Exception">If cannot decode</exception>
- protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
-
- public void Dispose()
- {
- _remaining = null;
- }
- }
+ public abstract class CumulativeProtocolDecoder : IProtocolDecoder
+ {
+ static ILog _logger = LogManager.GetLogger(typeof(CumulativeProtocolDecoder));
+
+ ByteBuffer _remaining;
+
+ /// <summary>
+ /// Creates a new instance with the 4096 bytes initial capacity of
+ /// cumulative buffer.
+ /// </summary>
+ protected CumulativeProtocolDecoder()
+ {
+ _remaining = AllocateBuffer();
+ }
+
+ /// <summary>
+ /// Cumulates content of <tt>in</tt> into internal buffer and forwards
+ /// decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ /// <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ /// and the cumulative buffer is compacted after decoding ends.
+ /// </summary>
+ /// <exception cref="Exception">
+ /// if your <tt>doDecode()</tt> returned <tt>true</tt> not consuming the cumulative buffer.
+ /// </exception>
+ public void Decode(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ if ( _remaining.Position != 0 ) // If there were remaining undecoded bytes
+ {
+ DecodeRemainingAndInput(input, output);
+ } else
+ {
+ DecodeInput(input, output);
+ }
+ }
+
+ private void DecodeInput(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ _logger.Debug(string.Format("DecodeInput: input {0}", input.Remaining));
+ // Just decode the input buffer and remember any remaining undecoded bytes.
+ try
+ {
+ DecodeAll(input, output);
+ } finally
+ {
+ if ( input.HasRemaining )
+ {
+ _remaining.Put(input);
+ }
+ }
+ }
+
+ private void DecodeRemainingAndInput(ByteBuffer input, IProtocolDecoderOutput output)
+ {
+ _logger.Debug(string.Format("DecodeRemainingAndInput: input {0}, remaining {1}", input.Remaining, _remaining.Position));
+ // replace the _remainder buffer, so that we can leave the
+ // original one alone. Necessary because some consumer splice
+ // the buffer and only consume it until later, causing
+ // a race condition if we compact it too soon.
+ ByteBuffer newRemainding = AllocateBuffer();
+ ByteBuffer temp = _remaining;
+ _remaining = newRemainding;
+ temp.Put(input);
+ temp.Flip();
+ try
+ {
+ DecodeAll(temp, output);
+ } finally
+ {
+ if ( temp.Remaining > 0 )
+ _remaining.Put(temp);
+ }
+ }
+
+ private void DecodeAll(ByteBuffer buf, IProtocolDecoderOutput output)
+ {
+ for ( ; ; )
+ {
+ int oldPos = buf.Position;
+ bool decoded = DoDecode(buf, output);
+ if ( decoded )
+ {
+ if ( buf.Position == oldPos )
+ {
+ throw new Exception(
+ "doDecode() can't return true when buffer is not consumed.");
+ }
+
+ if ( !buf.HasRemaining )
+ {
+ break;
+ }
+ } else
+ {
+ break;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Implement this method to consume the specified cumulative buffer and
+ /// decode its content into message(s).
+ /// </summary>
+ /// <param name="input">the cumulative buffer</param>
+ /// <param name="output">decoder output</param>
+ /// <returns>
+ /// <tt>true</tt> if and only if there's more to decode in the buffer
+ /// and you want to have <tt>doDecode</tt> method invoked again.
+ /// Return <tt>false</tt> if remaining data is not enough to decode,
+ /// then this method will be invoked again when more data is cumulated.
+ /// </returns>
+ /// <exception cref="Exception">If cannot decode</exception>
+ protected abstract bool DoDecode(ByteBuffer input, IProtocolDecoderOutput output);
+
+ public void Dispose()
+ {
+ _remaining = null;
+ }
+
+ private ByteBuffer AllocateBuffer()
+ {
+ ByteBuffer buffer = ByteBuffer.Allocate(4096);
+ buffer.IsAutoExpand = true;
+ return buffer;
+ }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/BasicContentHeaderProperties.cs Thu May 17 17:51:12 2007
@@ -25,144 +25,266 @@
namespace Qpid.Framing
{
- public class BasicContentHeaderProperties : IContentHeaderProperties
- {
- private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
+ public class BasicContentHeaderProperties : IContentHeaderProperties
+ {
+ private static readonly ILog _log = LogManager.GetLogger(typeof(BasicContentHeaderProperties));
+
+ private string _contentType;
+ private string _encoding;
+ private FieldTable _headers;
+ private byte _deliveryMode;
+ private byte _priority;
+ private string _correlationId;
+ private long _expiration;
+ private string _replyTo;
+ private string _messageId;
+ private ulong _timestamp;
+ private string _type;
+ private string _userId;
+ private string _appId;
+ private string _clusterId;
+
+
+ #region Properties
+ //
+ // Properties
+ //
+
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ public string ContentType
+ {
+ get { return _contentType; }
+ set { _contentType = value; }
+ }
+
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ public string Encoding
+ {
+ get { return _encoding; }
+ set { _encoding = value; }
+ }
+
+ /// <summary>
+ /// Message headers
+ /// </summary>
+ public FieldTable Headers
+ {
+ get { return _headers; }
+ set { _headers = value; }
+ }
+
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ public byte DeliveryMode
+ {
+ get { return _deliveryMode; }
+ set { _deliveryMode = value; }
+ }
+
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ public byte Priority
+ {
+ get { return _priority; }
+ set { _priority = value; }
+ }
+
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ public string CorrelationId
+ {
+ get { return _correlationId; }
+ set { _correlationId = value; }
+ }
+
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ // TODO: Should be string according to spec
+ public long Expiration
+ {
+ get { return _expiration; }
+ set { _expiration = value; }
+ }
+
+ /// <summary>
+ /// The destination to reply to
+ /// </summary>
+ public string ReplyTo
+ {
+ get { return _replyTo; }
+ set { _replyTo = value; }
+ }
+
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ public string MessageId
+ {
+ get { return _messageId; }
+ set { _messageId = value; }
+ }
+
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
+ public ulong Timestamp
+ {
+ get { return _timestamp; }
+ set { _timestamp = value; }
+ }
+
+ /// <summary>
+ /// The message type name
+ /// </summary>
+ public string Type
+ {
+ get { return _type; }
+ set { _type = value; }
+ }
+
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ public string UserId
+ {
+ get { return _userId; }
+ set { _userId = value; }
+ }
+
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ public string AppId
+ {
+ get { return _appId; }
+ set { _appId = value; }
+ }
+
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ public string ClusterId
+ {
+ get { return _clusterId; }
+ set { _clusterId = value; }
+ }
+
+ #endregion // Properties
+
+
+ public BasicContentHeaderProperties()
+ {
+ }
+
+ public uint PropertyListSize
+ {
+ get
+ {
+ return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
+ EncodingUtils.EncodedShortStringLength(Encoding) +
+ EncodingUtils.EncodedFieldTableLength(Headers) +
+ 1 + 1 +
+ EncodingUtils.EncodedShortStringLength(CorrelationId) +
+ EncodingUtils.EncodedShortStringLength(ReplyTo) +
+ EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
+ EncodingUtils.EncodedShortStringLength(MessageId) +
+ 8 +
+ EncodingUtils.EncodedShortStringLength(Type) +
+ EncodingUtils.EncodedShortStringLength(UserId) +
+ EncodingUtils.EncodedShortStringLength(AppId) +
+ EncodingUtils.EncodedShortStringLength(ClusterId));
+
+ }
+ }
+
+ public ushort PropertyFlags
+ {
+ get
+ {
+ int value = 0;
- public string ContentType;
-
- public string Encoding;
-
- public FieldTable Headers;
-
- public byte DeliveryMode;
-
- public byte Priority;
-
- public string CorrelationId;
-
- public long Expiration;
-
- public string ReplyTo;
-
- public string MessageId;
-
- public ulong Timestamp;
-
- public string Type;
-
- public string UserId;
-
- public string AppId;
-
- public string ClusterId;
-
- public BasicContentHeaderProperties()
- {
- }
-
- public uint PropertyListSize
- {
- get
- {
- return (uint)(EncodingUtils.EncodedShortStringLength(ContentType) +
- EncodingUtils.EncodedShortStringLength(Encoding) +
- EncodingUtils.EncodedFieldTableLength(Headers) +
- 1 + 1 +
- EncodingUtils.EncodedShortStringLength(CorrelationId) +
- EncodingUtils.EncodedShortStringLength(ReplyTo) +
- EncodingUtils.EncodedShortStringLength(String.Format("{0:D}", Expiration)) +
- EncodingUtils.EncodedShortStringLength(MessageId) +
- 8 +
- EncodingUtils.EncodedShortStringLength(Type) +
- EncodingUtils.EncodedShortStringLength(UserId) +
- EncodingUtils.EncodedShortStringLength(AppId) +
- EncodingUtils.EncodedShortStringLength(ClusterId));
-
- }
- }
-
- public ushort PropertyFlags
- {
- get
- {
- int value = 0;
-
- // for now we just blast in all properties
- for (int i = 0; i < 14; i++)
- {
- value += (1 << (15-i));
- }
- return (ushort) value;
- }
- }
-
- public void WritePropertyListPayload(ByteBuffer buffer)
- {
- EncodingUtils.WriteShortStringBytes(buffer, ContentType);
- EncodingUtils.WriteShortStringBytes(buffer, Encoding);
- EncodingUtils.WriteFieldTableBytes(buffer, Headers);
- buffer.Put(DeliveryMode);
- buffer.Put(Priority);
- EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
- EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
- EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
- EncodingUtils.WriteShortStringBytes(buffer, MessageId);
- buffer.Put(Timestamp);
- EncodingUtils.WriteShortStringBytes(buffer, Type);
- EncodingUtils.WriteShortStringBytes(buffer, UserId);
- EncodingUtils.WriteShortStringBytes(buffer, AppId);
- EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
- }
-
- public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags)
- {
- _log.Debug("Property flags: " + propertyFlags);
- if ((propertyFlags & (1 << 15)) > 0)
- ContentType = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 14)) > 0)
- Encoding = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 13)) > 0)
- Headers = EncodingUtils.ReadFieldTable(buffer);
- if ((propertyFlags & (1 << 12)) > 0)
- DeliveryMode = buffer.GetByte();
- if ((propertyFlags & (1 << 11)) > 0)
- Priority = buffer.GetByte();
- if ((propertyFlags & (1 << 10)) > 0)
- CorrelationId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 9)) > 0)
- ReplyTo = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 8)) > 0)
- Expiration = EncodingUtils.ReadLongAsShortString(buffer);
- if ((propertyFlags & (1 << 7)) > 0)
- MessageId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 6)) > 0)
- Timestamp = buffer.GetUInt64();
- if ((propertyFlags & (1 << 5)) > 0)
- Type = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 4)) > 0)
- UserId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 3)) > 0)
- AppId = EncodingUtils.ReadShortString(buffer);
- if ((propertyFlags & (1 << 2)) > 0)
- ClusterId = EncodingUtils.ReadShortString(buffer);
- }
-
- public void SetDeliveryMode(DeliveryMode deliveryMode)
- {
- if (deliveryMode == Messaging.DeliveryMode.NonPersistent)
+ // for now we just blast in all properties
+ for ( int i = 0; i < 14; i++ )
{
- DeliveryMode = 1;
+ value += (1 << (15 - i));
}
- else
- {
- DeliveryMode = 2;
- }
- }
-
- public override string ToString()
- {
- return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
- }
- }
+ return (ushort)value;
+ }
+ }
+
+ public void WritePropertyListPayload(ByteBuffer buffer)
+ {
+ EncodingUtils.WriteShortStringBytes(buffer, ContentType);
+ EncodingUtils.WriteShortStringBytes(buffer, Encoding);
+ EncodingUtils.WriteFieldTableBytes(buffer, Headers);
+ buffer.Put(DeliveryMode);
+ buffer.Put(Priority);
+ EncodingUtils.WriteShortStringBytes(buffer, CorrelationId);
+ EncodingUtils.WriteShortStringBytes(buffer, ReplyTo);
+ EncodingUtils.WriteShortStringBytes(buffer, String.Format("{0:D}", Expiration));
+ EncodingUtils.WriteShortStringBytes(buffer, MessageId);
+ buffer.Put(Timestamp);
+ EncodingUtils.WriteShortStringBytes(buffer, Type);
+ EncodingUtils.WriteShortStringBytes(buffer, UserId);
+ EncodingUtils.WriteShortStringBytes(buffer, AppId);
+ EncodingUtils.WriteShortStringBytes(buffer, ClusterId);
+ }
+
+ public void PopulatePropertiesFromBuffer(ByteBuffer buffer, ushort propertyFlags)
+ {
+ _log.Debug("Property flags: " + propertyFlags);
+ if ( (propertyFlags & (1 << 15)) > 0 )
+ ContentType = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 14)) > 0 )
+ Encoding = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 13)) > 0 )
+ Headers = EncodingUtils.ReadFieldTable(buffer);
+ if ( (propertyFlags & (1 << 12)) > 0 )
+ DeliveryMode = buffer.GetByte();
+ if ( (propertyFlags & (1 << 11)) > 0 )
+ Priority = buffer.GetByte();
+ if ( (propertyFlags & (1 << 10)) > 0 )
+ CorrelationId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 9)) > 0 )
+ ReplyTo = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 8)) > 0 )
+ Expiration = EncodingUtils.ReadLongAsShortString(buffer);
+ if ( (propertyFlags & (1 << 7)) > 0 )
+ MessageId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 6)) > 0 )
+ Timestamp = buffer.GetUInt64();
+ if ( (propertyFlags & (1 << 5)) > 0 )
+ Type = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 4)) > 0 )
+ UserId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 3)) > 0 )
+ AppId = EncodingUtils.ReadShortString(buffer);
+ if ( (propertyFlags & (1 << 2)) > 0 )
+ ClusterId = EncodingUtils.ReadShortString(buffer);
+ }
+
+ public void SetDeliveryMode(DeliveryMode deliveryMode)
+ {
+ if ( deliveryMode == Messaging.DeliveryMode.NonPersistent )
+ {
+ DeliveryMode = 1;
+ } else
+ {
+ DeliveryMode = 2;
+ }
+ }
+
+ public override string ToString()
+ {
+ return "Properties: " + ContentType + " " + Encoding + " " + Timestamp + " " + Type;
+ }
+ }
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/ContentBody.cs Thu May 17 17:51:12 2007
@@ -26,11 +26,24 @@
{
public const byte TYPE = 3;
- /// <summary>
- ///
- /// </summary>
- /// TODO: consider whether this should be a pointer into the ByteBuffer to avoid copying */
- public byte[] Payload;
+ private ByteBuffer _payload;
+
+ public ByteBuffer Payload
+ {
+ get { return _payload; }
+ }
+
+ public ContentBody()
+ {
+ }
+ public ContentBody(ByteBuffer payload)
+ {
+ PopulateFromBuffer(payload, (uint)payload.Remaining);
+ }
+ public ContentBody(ByteBuffer payload, uint length)
+ {
+ PopulateFromBuffer(payload, length);
+ }
#region IBody Members
@@ -46,7 +59,7 @@
{
get
{
- return (ushort)(Payload == null ? 0 : Payload.Length);
+ return (ushort)(Payload == null ? 0 : Payload.Remaining);
}
}
@@ -55,6 +68,7 @@
if (Payload != null)
{
buffer.Put(Payload);
+ Payload.Rewind();
}
}
@@ -62,8 +76,9 @@
{
if (size > 0)
{
- Payload = new byte[size];
- buffer.GetBytes(Payload);
+ _payload = buffer.Slice();
+ _payload.Limit = (int)size;
+ buffer.Skip((int)size);
}
}
@@ -75,6 +90,11 @@
frame.Channel = channelId;
frame.BodyFrame = body;
return frame;
+ }
+
+ public override string ToString()
+ {
+ return string.Format("ContentBody [ Size: {0} ]", Size);
}
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Framing/FieldTable.cs Thu May 17 17:51:12 2007
@@ -47,7 +47,6 @@
/// </summary>
/// <param name="buffer">the buffer from which to read data. The length byte must be read already</param>
/// <param name="length">the length of the field table. Must be > 0.</param>
- /// <exception cref="AMQFrameDecodingException">if there is an error decoding the table</exception>
public FieldTable(ByteBuffer buffer, uint length) : this()
{
_encodedForm = buffer.Slice();
@@ -497,27 +496,18 @@
private AMQTypedValue GetProperty(string name)
{
- lock ( _syncLock )
- {
- if ( _properties == null )
- {
- if ( _encodedForm == null )
- {
- return null;
- } else
- {
- PopulateFromBuffer();
- }
- }
- return (AMQTypedValue) _properties[name];
- }
+ InitMapIfNecessary();
+ return (AMQTypedValue) _properties[name];
}
private void PopulateFromBuffer()
{
try
{
- SetFromBuffer(_encodedForm, _encodedSize);
+ ByteBuffer buffer = _encodedForm;
+ _encodedForm = null;
+ if ( buffer != null )
+ SetFromBuffer(buffer, _encodedSize);
} catch ( AMQFrameDecodingException e )
{
_log.Error("Error decoding FieldTable in deferred decoding mode ", e);
@@ -598,7 +588,11 @@
{
if ( _encodedForm != null )
{
- buffer.Put(_encodedForm);
+ lock ( _syncLock )
+ {
+ buffer.Put(_encodedForm);
+ _encodedForm.Flip();
+ }
} else if ( _properties != null )
{
foreach ( DictionaryEntry de in _properties )
@@ -629,6 +623,7 @@
_log.Debug("Buffer Position:" + buffer.Position +
" Remaining:" + buffer.Remaining);
}
+ throw;
}
}
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs Thu May 17 17:51:12 2007
@@ -23,42 +23,135 @@
namespace Qpid.Messaging
{
public delegate void MessageReceivedDelegate(IMessage msg);
-
+
+ /// <summary>
+ /// Interface used to manipulate an AMQP channel.
+ /// </summary>
+ /// <remarks>
+ /// You can create a channel by using the CreateChannel() method
+ /// of the connection object.
+ /// </remarks>
public interface IChannel : IDisposable
{
+ /// <summary>
+ /// Acknowledge mode for messages received
+ /// </summary>
AcknowledgeMode AcknowledgeMode { get; }
+ /// <summary>
+ /// True if the channel should use transactions
+ /// </summary>
bool Transacted { get; }
/// <summary>
/// Prefetch value to be used as the default for consumers created on this channel.
/// </summary>
- int DefaultPrefetch
- {
- get;
- set;
- }
-
+ int DefaultPrefetch { get; set; }
+
+ /// <summary>
+ /// Declare a new exchange
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
+ /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param>
void DeclareExchange(string exchangeName, string exchangeClass);
+ /// <summary>
+ /// Declare a new exchange using the default exchange class
+ /// </summary>
+ /// <param name="exchangeName">Name of the exchange</param>
void DeleteExchange(string exchangeName);
+ /// <summary>
+ /// Declare a new queue with the specified set of arguments
+ /// </summary>
+ /// <param name="queueName">Name of the queue</param>
+ /// <param name="isDurable">True if the queue should be durable</param>
+ /// <param name="isExclusive">True if the queue should be exclusive to this channel</param>
+ /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param>
void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete);
- void DeleteQueue();
-
+ /// <summary>
+ /// Delete a queue with the specifies arguments
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param>
+ /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait);
+ /// <summary>
+ /// Generate a new Unique name to use for a queue
+ /// </summary>
+ /// <returns>A unique name to this channel</returns>
string GenerateUniqueName();
- IFieldTable CreateFieldTable();
+ /// <summary>
+ /// Removes all messages from a queue
+ /// </summary>
+ /// <param name="queueName">Name of the queue to delete</param>
+ /// <param name="noWait">If true, the server will not respond to the method</param>
+ void PurgeQueue(string queueName, bool noWait);
+
+ /// <summary>
+ /// Bind a queue to the specified exchange
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
void Bind(string queueName, string exchangeName, string routingKey);
+ /// <summary>
+ /// Bind a queue to the specified exchange
+ /// </summary>
+ /// <param name="queueName">Name of queue to bind</param>
+ /// <param name="exchangeName">Name of exchange to bind to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param>
void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args);
+ /// <summary>
+ /// Create a new empty message with no body
+ /// </summary>
+ /// <returns>The new message</returns>
IMessage CreateMessage();
+ /// <summary>
+ /// Create a new message of the specified MIME type
+ /// </summary>
+ /// <param name="mimeType">The mime type to create</param>
+ /// <returns>The new message</returns>
+ IMessage CreateMessage(string mimeType);
+ /// <summary>
+ /// Creates a new message for bytes (application/octet-stream)
+ /// </summary>
+ /// <returns>The new message</returns>
IBytesMessage CreateBytesMessage();
+ /// <summary>
+ /// Creates a new text message (text/plain) with empty content
+ /// </summary>
+ /// <returns>The new message</returns>
ITextMessage CreateTextMessage();
+ /// <summary>
+ /// Creates a new text message (text/plain) with a body
+ /// </summary>
+ /// <param name="initialValue">Initial body of the message</param>
+ /// <returns>The new message</returns>
ITextMessage CreateTextMessage(string initialValue);
#region Consuming
-
+
+ /// <summary>
+ /// Creates a new Consumer using the builder pattern
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <returns>The builder object</returns>
MessageConsumerBuilder CreateConsumerBuilder(string queueName);
+ /// <summary>
+ /// Creates a new consumer
+ /// </summary>
+ /// <param name="queueName">Name of queue to receive messages from</param>
+ /// <param name="prefetchLow">Low prefetch value</param>
+ /// <param name="prefetchHigh">High prefetch value</param>
+ /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param>
+ /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param>
+ /// <param name="durable">If true, create a durable subscription</param>
+ /// <param name="subscriptionName">Subscription name</param>
+ /// <returns>The new consumer</returns>
IMessageConsumer CreateConsumer(string queueName,
int prefetchLow,
int prefetchHigh,
@@ -66,15 +159,35 @@
bool exclusive,
bool durable,
string subscriptionName);
-
+
+ /// <summary>
+ /// Unsubscribe from a queue
+ /// </summary>
+ /// <param name="subscriptionName">Subscription name</param>
void Unsubscribe(string subscriptionName);
#endregion
#region Publishing
+ /// <summary>
+ /// Create a new message publisher using the builder pattern
+ /// </summary>
+ /// <returns>The builder object</returns>
MessagePublisherBuilder CreatePublisherBuilder();
-
+
+ /// <summary>
+ /// Create a new message publisher
+ /// </summary>
+ /// <param name="exchangeName">Name of exchange to publish to</param>
+ /// <param name="routingKey">Routing key</param>
+ /// <param name="deliveryMode">Default delivery mode</param>
+ /// <param name="timeToLive">Default TTL time of messages</param>
+ /// <param name="immediate">If true, sent immediately</param>
+ /// <param name="mandatory">If true, the broker will return an error
+ /// (as a connection exception) if the message cannot be delivered</param>
+ /// <param name="priority">Default message priority</param>
+ /// <returns>The new message publisher</returns>
IMessagePublisher CreatePublisher(string exchangeName,
string routingKey,
DeliveryMode deliveryMode,
@@ -86,9 +199,18 @@
#endregion
#region Transactions
-
+
+ /// <summary>
+ /// Recover after transaction failure
+ /// </summary>
void Recover();
+ /// <summary>
+ /// Commit the transaction
+ /// </summary>
void Commit();
+ /// <summary>
+ /// Rollback the transaction
+ /// </summary>
void Rollback();
#endregion
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IHeaders.cs Thu May 17 17:51:12 2007
@@ -35,7 +35,7 @@
{
bool Contains(string name);
- string this[string name] { get; set; }
+ object this[string name] { get; set; }
bool GetBoolean(string name);
void SetBoolean(string name, bool value);
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs?view=diff&rev=539198&r1=539197&r2=539198
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IMessage.cs Thu May 17 17:51:12 2007
@@ -22,25 +22,75 @@
{
public interface IMessage
{
- string ContentType { get; set;}
- string ContentEncoding { get; set; }
- string CorrelationId { get; set; }
- byte[] CorrelationIdAsBytes { get; set; }
- DeliveryMode DeliveryMode { get; set; }
- long Expiration { get; set; }
- string MessageId { get; set; }
- int Priority { get; set; }
+ /// <summary>
+ /// The MIME Content Type
+ /// </summary>
+ string ContentType { get; set;}
+ /// <summary>
+ /// The MIME Content Encoding
+ /// </summary>
+ string ContentEncoding { get; set; }
+ /// <summary>
+ /// The application correlation identifier
+ /// </summary>
+ string CorrelationId { get; set; }
+ /// <summary>
+ /// The application correlation identifier, as an array of bytes
+ /// </summary>
+ byte[] CorrelationIdAsBytes { get; set; }
+ /// <summary>
+ /// Non-persistent (1) or persistent (2)
+ /// </summary>
+ DeliveryMode DeliveryMode { get; set; }
+ /// <summary>
+ /// Message expiration specification
+ /// </summary>
+ long Expiration { get; set; }
+ /// <summary>
+ /// The application message identifier
+ /// </summary>
+ string MessageId { get; set; }
+ /// <summary>
+ /// The message priority, 0 to 9
+ /// </summary>
+ byte Priority { get; set; }
+ /// <summary>
+ /// True if the message has been redelivered
+ /// </summary>
bool Redelivered { get; set; }
+ /// <summary>
+ /// Exchange name of the reply-to address
+ /// </summary>
string ReplyToExchangeName { get; set; }
+ /// <summary>
+ /// Routing key of the reply-to address
+ /// </summary>
string ReplyToRoutingKey { get; set; }
+ /// <summary>
+ /// The message timestamp
+ /// </summary>
long Timestamp { get; set; }
+ /// <summary>
+ /// The message type name
+ /// </summary>
string Type { get; set; }
+ /// <summary>
+ /// Message headers
+ /// </summary>
IHeaders Headers { get; }
-
- // XXX: UserId?
- // XXX: AppId?
- // XXX: ClusterId?
-
+ /// <summary>
+ /// The creating user id
+ /// </summary>
+ string UserId { get; set; }
+ /// <summary>
+ /// The creating application id
+ /// </summary>
+ string AppId { get; set; }
+ /// <summary>
+ /// Intra-cluster routing identifier
+ /// </summary>
+ string ClusterId { get; set; }
+
void Acknowledge();
void ClearBody();
}