You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/07 19:39:19 UTC
[39/50] [abbrv] activemq-nms-msmq git commit: Apply patch for
AMQNET-554. Suport for message properties,
and selectors. Thanks Stephane Ramet!
Apply patch for AMQNET-554. Suport for message properties, and selectors. Thanks Stephane Ramet!
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/16d8f06d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/16d8f06d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/16d8f06d
Branch: refs/heads/master
Commit: 16d8f06dd0b178c5030c0da7345aa16e2d51a761
Parents: 51ec9a2
Author: Jim Gomes <jg...@apache.org>
Authored: Thu Jul 7 20:47:10 2016 +0000
Committer: Jim Gomes <jg...@apache.org>
Committed: Thu Jul 7 20:47:10 2016 +0000
----------------------------------------------------------------------
src/main/csharp/BaseMessage.cs | 154 ++-
src/main/csharp/DefaultMessageConverter.cs | 500 +++++++-
src/main/csharp/IMessageConverter.cs | 28 +-
src/main/csharp/IMessageConverterEx.cs | 44 +
src/main/csharp/MessageConsumer.cs | 93 +-
src/main/csharp/QueueBrowser.cs | 31 +-
.../csharp/Readers/AbstractMessageReader.cs | 126 ++
.../Readers/ByCorrelationIdMessageReader.cs | 139 +++
src/main/csharp/Readers/ByIdMessageReader.cs | 136 ++
.../csharp/Readers/ByLookupIdMessageReader.cs | 145 +++
.../csharp/Readers/BySelectorMessageReader.cs | 290 +++++
src/main/csharp/Readers/IMessageReader.cs | 93 ++
src/main/csharp/Readers/MessageReaderUtil.cs | 91 ++
.../csharp/Readers/NonFilteringMessageReader.cs | 128 ++
src/main/csharp/Selector/ANDExpression.cs | 47 +
.../csharp/Selector/AlignedNumericValues.cs | 175 +++
.../csharp/Selector/ArithmeticExpression.cs | 57 +
src/main/csharp/Selector/BinaryExpression.cs | 59 +
.../csharp/Selector/BooleanCastExpression.cs | 45 +
.../Selector/BooleanConstantExpression.cs | 38 +
.../csharp/Selector/BooleanUnaryExpression.cs | 39 +
.../csharp/Selector/ComparisonExpression.cs | 162 +++
src/main/csharp/Selector/ConstantExpression.cs | 157 +++
src/main/csharp/Selector/DivideExpression.cs | 67 +
src/main/csharp/Selector/EqualExpression.cs | 47 +
src/main/csharp/Selector/GreaterExpression.cs | 42 +
.../csharp/Selector/GreaterOrEqualExpression.cs | 43 +
src/main/csharp/Selector/IBooleanExpression.cs | 35 +
src/main/csharp/Selector/IExpression.cs | 35 +
src/main/csharp/Selector/InExpression.cs | 98 ++
src/main/csharp/Selector/IsNullExpression.cs | 59 +
src/main/csharp/Selector/LesserExpression.cs | 42 +
.../csharp/Selector/LesserOrEqualExpression.cs | 43 +
src/main/csharp/Selector/LikeExpression.cs | 124 ++
src/main/csharp/Selector/LogicExpression.cs | 48 +
.../csharp/Selector/MessageEvaluationContext.cs | 78 ++
src/main/csharp/Selector/MinusExpression.cs | 67 +
src/main/csharp/Selector/ModExpression.cs | 67 +
src/main/csharp/Selector/MultiplyExpression.cs | 67 +
src/main/csharp/Selector/NOTExpression.cs | 45 +
src/main/csharp/Selector/NegateExpression.cs | 51 +
src/main/csharp/Selector/ORExpression.cs | 46 +
src/main/csharp/Selector/ParseException.cs | 197 +++
src/main/csharp/Selector/PlusExpression.cs | 68 +
src/main/csharp/Selector/PropertyExpression.cs | 53 +
src/main/csharp/Selector/SelectorParser.cs | 1172 ++++++++++++++++++
src/main/csharp/Selector/SelectorParser.csc | 589 +++++++++
.../csharp/Selector/SelectorParserConstants.cs | 75 ++
.../Selector/SelectorParserTokenManager.cs | 1042 ++++++++++++++++
src/main/csharp/Selector/SimpleCharStream.cs | 366 ++++++
src/main/csharp/Selector/Token.cs | 78 ++
src/main/csharp/Selector/TokenMgrError.cs | 130 ++
src/main/csharp/Selector/UnaryExpression.cs | 66 +
src/main/csharp/Session.cs | 12 +-
vs2008-msmq.csproj | 47 +
55 files changed, 7585 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/BaseMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index db73f4c..8ea7e31 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -24,28 +24,9 @@ namespace Apache.NMS.MSMQ
public class BaseMessage : IMessage
{
- private PrimitiveMap propertiesMap = new PrimitiveMap();
- private IDestination destination;
- private string correlationId;
- private TimeSpan timeToLive;
- private string messageId;
- private MsgDeliveryMode deliveryMode;
- private MsgPriority priority;
- private Destination replyTo;
- private byte[] content;
- private string type;
- private event AcknowledgeHandler Acknowledger;
- private DateTime timestamp = new DateTime();
- private bool readOnlyMsgBody = false;
-
- public bool ReadOnlyBody
- {
- get { return readOnlyMsgBody; }
- set { readOnlyMsgBody = value; }
- }
-
- // IMessage interface
+ #region Acknowledgement
+ private event AcknowledgeHandler Acknowledger;
public void Acknowledge()
{
if(null != Acknowledger)
@@ -54,6 +35,27 @@ namespace Apache.NMS.MSMQ
}
}
+ #endregion
+
+ #region Message body
+
+ private byte[] content;
+ public byte[] Content
+ {
+ get { return content; }
+ set { this.content = value; }
+ }
+
+ private bool readOnlyMsgBody = false;
+ /// <summary>
+ /// Whether the message body is read-only.
+ /// </summary>
+ public bool ReadOnlyBody
+ {
+ get { return readOnlyMsgBody; }
+ set { readOnlyMsgBody = value; }
+ }
+
/// <summary>
/// Clears out the message body. Clearing a message's body does not clear its header
/// values or property entries.
@@ -67,26 +69,82 @@ namespace Apache.NMS.MSMQ
this.readOnlyMsgBody = false;
}
+ #endregion
+
+ #region Message properties
+
+ private PrimitiveMap propertiesMap = new PrimitiveMap();
+ private MessagePropertyIntercepter propertyHelper;
+ /// <summary>
+ /// Provides access to the message properties (headers)
+ /// </summary>
+ public Apache.NMS.IPrimitiveMap Properties
+ {
+ get
+ {
+ if(propertyHelper == null)
+ {
+ propertyHelper = new Apache.NMS.Util.MessagePropertyIntercepter(
+ this, propertiesMap, this.ReadOnlyProperties);
+ }
+
+ return propertyHelper;
+ }
+ }
+
+ private bool readOnlyMsgProperties = false;
+ /// <summary>
+ /// Whether the message properties is read-only.
+ /// </summary>
+ public virtual bool ReadOnlyProperties
+ {
+ get { return this.readOnlyMsgProperties; }
+
+ set
+ {
+ if(this.propertyHelper != null)
+ {
+ this.propertyHelper.ReadOnly = value;
+ }
+ this.readOnlyMsgProperties = value;
+ }
+ }
+
/// <summary>
/// Clears a message's properties.
- ///
/// The message's header fields and body are not cleared.
/// </summary>
- public virtual void ClearProperties()
+ public void ClearProperties()
{
- propertiesMap.Clear();
+ this.ReadOnlyProperties = false;
+ this.propertiesMap.Clear();
}
- // Properties
+ public object GetObjectProperty(string name)
+ {
+ return Properties[name];
+ }
- public IPrimitiveMap Properties
+ public void SetObjectProperty(string name, object value)
{
- get { return propertiesMap; }
+ Properties[name] = value;
}
+ #endregion
- // NMS headers
+ #region Message header fields
+ private string messageId;
+ /// <summary>
+ /// The message ID which is set by the provider
+ /// </summary>
+ public string NMSMessageId
+ {
+ get { return messageId; }
+ set { messageId = value; }
+ }
+
+ private string correlationId;
/// <summary>
/// The correlation ID used to correlate messages with conversations or long running business processes
/// </summary>
@@ -96,6 +154,7 @@ namespace Apache.NMS.MSMQ
set { correlationId = value; }
}
+ private IDestination destination;
/// <summary>
/// The destination of the message
/// </summary>
@@ -105,6 +164,7 @@ namespace Apache.NMS.MSMQ
set { destination = value; }
}
+ private TimeSpan timeToLive;
/// <summary>
/// The time in milliseconds that this message should expire in
/// </summary>
@@ -114,15 +174,7 @@ namespace Apache.NMS.MSMQ
set { timeToLive = value; }
}
- /// <summary>
- /// The message ID which is set by the provider
- /// </summary>
- public string NMSMessageId
- {
- get { return messageId; }
- set { messageId = value; }
- }
-
+ private MsgDeliveryMode deliveryMode;
/// <summary>
/// Whether or not this message is persistent
/// </summary>
@@ -132,6 +184,7 @@ namespace Apache.NMS.MSMQ
set { deliveryMode = value; }
}
+ private MsgPriority priority;
/// <summary>
/// The Priority on this message
/// </summary>
@@ -150,7 +203,7 @@ namespace Apache.NMS.MSMQ
set { }
}
-
+ private Destination replyTo;
/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
@@ -160,7 +213,7 @@ namespace Apache.NMS.MSMQ
set { replyTo = (Destination) value; }
}
-
+ private DateTime timestamp = new DateTime();
/// <summary>
/// The timestamp the broker added to the message
/// </summary>
@@ -170,12 +223,7 @@ namespace Apache.NMS.MSMQ
set { timestamp = value; }
}
- public byte[] Content
- {
- get { return content; }
- set { this.content = value; }
- }
-
+ private string type;
/// <summary>
/// The type name of this message
/// </summary>
@@ -185,15 +233,9 @@ namespace Apache.NMS.MSMQ
set { type = value; }
}
+ #endregion
- public object GetObjectProperty(string name)
- {
- return null;
- }
-
- public void SetObjectProperty(string name, object value)
- {
- }
+ #region Check access mode
protected void FailIfReadOnlyBody()
{
@@ -205,11 +247,13 @@ namespace Apache.NMS.MSMQ
protected void FailIfWriteOnlyBody()
{
- if( ReadOnlyBody == false )
+ if(ReadOnlyBody == false)
{
throw new MessageNotReadableException("Message is in Write-Only mode.");
}
}
+
+ #endregion
}
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/DefaultMessageConverter.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/DefaultMessageConverter.cs b/src/main/csharp/DefaultMessageConverter.cs
index 2aa3438..83097fc 100644
--- a/src/main/csharp/DefaultMessageConverter.cs
+++ b/src/main/csharp/DefaultMessageConverter.cs
@@ -32,12 +32,61 @@ namespace Apache.NMS.MSMQ
StreamMessage
}
- public class DefaultMessageConverter : IMessageConverter
+ /// <summary>
+ /// This class provides default rules for converting MSMQ to and from
+ /// NMS messages, when the peer system expects or produces compatible
+ /// mappings, typically when the peer system is also implemented on
+ /// Apache.NMS.
+ /// Default mappings are as follows :
+ /// <ul>
+ /// <li>
+ /// the MSMQ Message.AppSetting field is used for specifying the NMS
+ /// message type, as specified by the <c>NMSMessageType</c> enumeration.
+ /// </li>
+ /// <li>
+ /// the MSMQ Message.Extension field is populated with a map
+ /// (a marshalled <c>PrimitiveMap</c>) of message properties.
+ /// </li>
+ /// <li>
+ /// in earlier versions of Apache.NMS.MSMQ, the MSMQ Message.Label
+ /// field was populated with the value of the NMSType field. Setting
+ /// <c>SetLabelAsNMSType</c> to true (the default value) applies that
+ /// same rule, which makes it compatible with existing NMS peers. If
+ /// set to false, the Message.Label field is populated with the value
+ /// of a "Label" property, if it exists, thus making it readable by
+ /// standard management or monitoring tools. The NMSType value is then
+ /// transmitted as a field in the Message.Extension map.
+ /// </li>
+ /// </ul>
+ /// Please note that in earlier versions of Apache.NMS, only one property
+ /// was set in the Message.Extension field : the NMSCorrelationID.
+ /// The native Message.CorrelationId field is not settable, except for
+ /// reply messages explicitely created as such through the MSMQ API.
+ /// Transmission of the correlation id. through a mapped property called
+ /// NMSCorrelationID is therefore maintained.
+ /// When exchanging messages with a non compatible peer, a specific
+ /// message converter must be provided, which should at least be able to
+ /// map message types and define the encoding used for text messages.
+ /// </summary>
+ public class DefaultMessageConverter : IMessageConverterEx
{
+ private bool setLabelAsNMSType = true;
+ public bool SetLabelAsNMSType
+ {
+ get { return setLabelAsNMSType; }
+ set { setLabelAsNMSType = value; }
+ }
+
+ #region Messages
+ /// <summary>
+ /// Converts the specified NMS message to an equivalent MSMQ message.
+ /// </summary>
+ /// <param name="message">NMS message to be converted.</param>
+ /// <result>Converted MSMQ message.</result>
public virtual Message ToMsmqMessage(IMessage message)
{
Message msmqMessage = new Message();
- PrimitiveMap metaData = new PrimitiveMap();
+ PrimitiveMap propertyData = new PrimitiveMap();
ConvertMessageBodyToMSMQ(message, msmqMessage);
@@ -48,32 +97,72 @@ namespace Apache.NMS.MSMQ
if(message.NMSCorrelationID != null)
{
- metaData.SetString("NMSCorrelationID", message.NMSCorrelationID);
+ propertyData.SetString("NMSCorrelationID", message.NMSCorrelationID);
}
msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
- msmqMessage.Priority = ToMessagePriority(message.NMSPriority);
+ msmqMessage.Priority = ToMsmqMessagePriority(message.NMSPriority);
msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo);
if(message.NMSType != null)
{
- msmqMessage.Label = message.NMSType;
+ if(SetLabelAsNMSType)
+ {
+ propertyData.SetString("NMSType", message.NMSType);
+ }
+ else
+ {
+ msmqMessage.Label = message.NMSType;
+ }
}
- // Store the NMS meta data in the extension area
- msmqMessage.Extension = metaData.Marshal();
+ // Populate property data
+ foreach(object keyObject in message.Properties.Keys)
+ {
+ string key = (keyObject as string);
+ object val = message.Properties.GetString(key);
+ if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null)
+ {
+ msmqMessage.Label = val.ToString();
+ }
+ else
+ {
+ propertyData[key] = val;
+ }
+ }
+
+ // Store the NMS property data in the extension area
+ msmqMessage.Extension = propertyData.Marshal();
return msmqMessage;
}
+ /// <summary>
+ /// Converts the specified MSMQ message to an equivalent NMS message
+ /// (including its message body).
+ /// </summary>
+ /// <param name="message">MSMQ message to be converted.</param>
+ /// <result>Converted NMS message.</result>
public virtual IMessage ToNmsMessage(Message message)
{
- BaseMessage answer = CreateNmsMessage(message);
- // Get the NMS meta data from the extension area
- PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension);
+ return ToNmsMessage(message, true);
+ }
+
+ /// <summary>
+ /// Converts the specified MSMQ message to an equivalent NMS message.
+ /// </summary>
+ /// <param name="message">MSMQ message to be converted.</param>
+ /// <param name="convertBody">true if message body should be converted.</param>
+ /// <result>Converted NMS message.</result>
+ public virtual IMessage ToNmsMessage(Message message, bool convertBody)
+ {
+ BaseMessage answer = CreateNmsMessage(message, convertBody);
+
+ // Get the NMS property data from the extension area
+ PrimitiveMap propertyData = PrimitiveMap.Unmarshal(message.Extension);
try
{
answer.NMSMessageId = message.Id;
- answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID");
+ answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
}
@@ -83,18 +172,85 @@ namespace Apache.NMS.MSMQ
try
{
- answer.NMSType = message.Label;
answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
answer.NMSTimeToLive = message.TimeToBeReceived;
+ answer.NMSPriority = ToNmsMsgPriority(message.Priority);
+ }
+ catch(InvalidOperationException)
+ {
+ }
+
+ try
+ {
+ if(message.Label != null)
+ {
+ if(SetLabelAsNMSType)
+ {
+ answer.NMSType = message.Label;
+ }
+ else
+ {
+ answer.Properties["Label"] = message.Label;
+ }
+ }
+ answer.Properties["LookupId"] = message.LookupId;
}
catch(InvalidOperationException)
{
}
+ foreach(object keyObject in propertyData.Keys)
+ {
+ try
+ {
+ string key = (keyObject as string);
+ if(string.Compare(key, "NMSType", true) == 0)
+ {
+ answer.NMSType = propertyData.GetString(key);
+ }
+ else if(string.Compare(key, "NMSCorrelationID", true) == 0)
+ {
+ answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
+ }
+ else
+ {
+ answer.Properties[key] = propertyData[key];
+ }
+ }
+ catch(InvalidOperationException)
+ {
+ }
+ }
return answer;
}
- private static MessagePriority ToMessagePriority(MsgPriority msgPriority)
+ #endregion
+
+ #region Message priority
+
+ // Message priorities are defined as follows :
+ // | MSMQ | NMS |
+ // | MessagePriority | MsgPriority |
+ // +--------------------+--------------------+
+ // | Lowest | Lowest |
+ // | VeryLow | VeryLow |
+ // | Low | Low |
+ // | \-> | AboveLow |
+ // | /-> | BelowNormal |
+ // | Normal | Normal |
+ // | AboveNormal | AboveNormal |
+ // | High | High |
+ // | VeryHigh | VeryHigh |
+ // | Highest | Highest |
+ // +--------------------+--------------------+
+
+ /// <summary>
+ /// Converts the specified NMS message priority to an equivalent MSMQ
+ /// message priority.
+ /// </summary>
+ /// <param name="msgPriority">NMS message priority to be converted.</param>
+ /// <result>Converted MSMQ message priority.</result>
+ private static MessagePriority ToMsmqMessagePriority(MsgPriority msgPriority)
{
switch(msgPriority)
{
@@ -127,6 +283,153 @@ namespace Apache.NMS.MSMQ
}
}
+ /// <summary>
+ /// Converts the specified MSMQ message priority to an equivalent NMS
+ /// message priority.
+ /// </summary>
+ /// <param name="messagePriority">MSMQ message priority to be converted.</param>
+ /// <result>Converted NMS message priority.</result>
+ private static MsgPriority ToNmsMsgPriority(MessagePriority messagePriority)
+ {
+ switch(messagePriority)
+ {
+ case MessagePriority.Lowest:
+ return MsgPriority.Lowest;
+
+ case MessagePriority.VeryLow:
+ return MsgPriority.VeryLow;
+
+ case MessagePriority.Low:
+ return MsgPriority.Low;
+
+ default:
+ case MessagePriority.Normal:
+ return MsgPriority.Normal;
+
+ case MessagePriority.AboveNormal:
+ return MsgPriority.AboveNormal;
+
+ case MessagePriority.High:
+ return MsgPriority.High;
+
+ case MessagePriority.VeryHigh:
+ return MsgPriority.VeryHigh;
+
+ case MessagePriority.Highest:
+ return MsgPriority.Highest;
+ }
+ }
+
+ #endregion
+
+ #region Message creation
+
+ // Conversion of the message body has been separated from the creation
+ // of the NMS message object for performance reasons when using
+ // selectors (selectors handle only message attributes, not message
+ // bodies).
+ // CreateNmsMessage(Message) is maintained for compatibility reasons
+ // with existing clients that may have implemented derived classes,
+ // instead of completely removing the body conversion part from the
+ // method.
+
+ /// <summary>
+ /// Creates an NMS message of appropriate type for the specified MSMQ
+ /// message, and convert the message body.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <result>NMS message created for retrieving the MSMQ message.</result>
+ protected virtual BaseMessage CreateNmsMessage(Message message)
+ {
+ return CreateNmsMessage(message, true);
+ }
+
+ /// <summary>
+ /// Creates an NMS message of appropriate type for the specified MSMQ
+ /// message, and convert the message body if specified.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <param name="convertBody">true if the message body must be
+ /// converted.</param>
+ /// <result>NMS message created for retrieving the MSMQ message.</result>
+ protected virtual BaseMessage CreateNmsMessage(Message message,
+ bool convertBody)
+ {
+ BaseMessage result = null;
+
+ if((int) NMSMessageType.TextMessage == message.AppSpecific)
+ {
+ TextMessage textMessage = new TextMessage();
+
+ if(convertBody)
+ {
+ ConvertTextMessageBodyToNMS(message, textMessage);
+ }
+
+ result = textMessage;
+ }
+ else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
+ {
+ BytesMessage bytesMessage = new BytesMessage();
+
+ if(convertBody)
+ {
+ ConvertBytesMessageBodyToNMS(message, bytesMessage);
+ }
+
+ result = bytesMessage;
+ }
+ else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
+ {
+ ObjectMessage objectMessage = new ObjectMessage();
+
+ if(convertBody)
+ {
+ ConvertObjectMessageBodyToNMS(message, objectMessage);
+ }
+
+ result = objectMessage;
+ }
+ else if((int) NMSMessageType.MapMessage == message.AppSpecific)
+ {
+ MapMessage mapMessage = new MapMessage();
+
+ if(convertBody)
+ {
+ ConvertMapMessageBodyToNMS(message, mapMessage);
+ }
+
+ result = mapMessage;
+ }
+ else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
+ {
+ StreamMessage streamMessage = new StreamMessage();
+
+ if(convertBody)
+ {
+ ConvertStreamMessageBodyToNMS(message, streamMessage);
+ }
+
+ result = streamMessage;
+ }
+ else
+ {
+ BaseMessage baseMessage = new BaseMessage();
+ result = baseMessage;
+ }
+
+ return result;
+ }
+
+ #endregion
+
+ #region Message body
+
+ /// <summary>
+ /// Converts an NMS message body to the equivalent MSMQ message body.
+ /// </summary>
+ /// <param name="message">Source NMS message.</param>
+ /// <param name="answer">Target MSMQ message.</param>
protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer)
{
if(message is TextMessage)
@@ -172,78 +475,133 @@ namespace Apache.NMS.MSMQ
}
}
- protected virtual BaseMessage CreateNmsMessage(Message message)
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS message body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS message.</param>
+ public virtual void ConvertMessageBodyToNMS(Message message, IMessage answer)
{
- BaseMessage result = null;
-
- if((int) NMSMessageType.TextMessage == message.AppSpecific)
+ if(answer is TextMessage)
{
- TextMessage textMessage = new TextMessage();
- string content = String.Empty;
-
- if(message.BodyStream != null && message.BodyStream.Length > 0)
- {
- byte[] buf = null;
- buf = new byte[message.BodyStream.Length];
- message.BodyStream.Read(buf, 0, buf.Length);
- content = Encoding.UTF32.GetString(buf);
- }
-
- textMessage.Text = content;
- result = textMessage;
+ ConvertTextMessageBodyToNMS(message, (TextMessage)answer);
}
- else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
+ else if(answer is BytesMessage)
{
- byte[] buf = null;
-
- if(message.BodyStream != null && message.BodyStream.Length > 0)
- {
- buf = new byte[message.BodyStream.Length];
- message.BodyStream.Read(buf, 0, buf.Length);
- }
-
- BytesMessage bytesMessage = new BytesMessage();
- bytesMessage.Content = buf;
- result = bytesMessage;
+ ConvertBytesMessageBodyToNMS(message, (BytesMessage)answer);
}
- else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
+ else if(answer is ObjectMessage)
{
- ObjectMessage objectMessage = new ObjectMessage();
-
- objectMessage.Body = message.Body;
- result = objectMessage;
+ ConvertObjectMessageBodyToNMS(message, (ObjectMessage)answer);
}
- else if((int) NMSMessageType.MapMessage == message.AppSpecific)
+ else if(answer is MapMessage)
{
- byte[] buf = null;
-
- if(message.BodyStream != null && message.BodyStream.Length > 0)
- {
- buf = new byte[message.BodyStream.Length];
- message.BodyStream.Read(buf, 0, buf.Length);
- }
-
- MapMessage mapMessage = new MapMessage();
- mapMessage.Body = PrimitiveMap.Unmarshal(buf);
- result = mapMessage;
+ ConvertMapMessageBodyToNMS(message, (MapMessage)answer);
}
- else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
+ else if(answer is StreamMessage)
{
- StreamMessage streamMessage = new StreamMessage();
+ ConvertStreamMessageBodyToNMS(message, (StreamMessage)answer);
+ }
- // TODO: Implement
- result = streamMessage;
+ return;
+ }
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS text message
+ /// body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS text message.</param>
+ public virtual void ConvertTextMessageBodyToNMS(Message message,
+ TextMessage answer)
+ {
+ string content = String.Empty;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
+ {
+ byte[] buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
+ content = Encoding.UTF32.GetString(buf);
}
- else
+
+ answer.Text = content;
+ }
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS bytes message
+ /// body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS bytes message.</param>
+ public virtual void ConvertBytesMessageBodyToNMS(Message message,
+ BytesMessage answer)
+ {
+ byte[] buf = null;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
{
- BaseMessage baseMessage = new BaseMessage();
+ buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
+ }
- result = baseMessage;
+ answer.Content = buf;
+ }
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS object message
+ /// body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS object message.</param>
+ public virtual void ConvertObjectMessageBodyToNMS(Message message,
+ ObjectMessage answer)
+ {
+ answer.Body = message.Body;
+ }
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS map message
+ /// body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS map message.</param>
+ public virtual void ConvertMapMessageBodyToNMS(Message message,
+ MapMessage answer)
+ {
+ byte[] buf = null;
+
+ if(message.BodyStream != null && message.BodyStream.Length > 0)
+ {
+ buf = new byte[message.BodyStream.Length];
+ message.BodyStream.Read(buf, 0, buf.Length);
}
- return result;
+ answer.Body = PrimitiveMap.Unmarshal(buf);
+ }
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS stream message
+ /// body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS stream message.</param>
+ public virtual void ConvertStreamMessageBodyToNMS(Message message,
+ StreamMessage answer)
+ {
+ // TODO: Implement
+ throw new NotImplementedException();
}
+ #endregion
+
+ #region Destination
+
+ /// <summary>
+ /// Converts an NMS destination to the equivalent MSMQ destination
+ /// (ie. queue).
+ /// </summary>
+ /// <param name="destination">NMS destination.</param>
+ /// <result>MSMQ queue.</result>
public MessageQueue ToMsmqDestination(IDestination destination)
{
if(null == destination)
@@ -254,6 +612,12 @@ namespace Apache.NMS.MSMQ
return new MessageQueue((destination as Destination).Path);
}
+ /// <summary>
+ /// Converts an MSMQ destination (ie. queue) to the equivalent NMS
+ /// destination.
+ /// </summary>
+ /// <param name="destinationQueue">MSMQ destination queue.</param>
+ /// <result>NMS destination.</result>
protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
{
if(null == destinationQueue)
@@ -263,5 +627,7 @@ namespace Apache.NMS.MSMQ
return new Queue(destinationQueue.Path);
}
+
+ #endregion
}
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverter.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/IMessageConverter.cs b/src/main/csharp/IMessageConverter.cs
index 152377b..14c6669 100644
--- a/src/main/csharp/IMessageConverter.cs
+++ b/src/main/csharp/IMessageConverter.cs
@@ -20,15 +20,27 @@ namespace Apache.NMS.MSMQ
{
public interface IMessageConverter
{
-
- /// <summary>
- /// Method ToMSMQMessageQueue
- /// </summary>
- /// <param name="destination">An IDestination</param>
- /// <returns>A MessageQueue</returns>
- MessageQueue ToMsmqDestination(IDestination destination);
-
+ /// <summary>
+ /// Converts the specified NMS message to an equivalent MSMQ message.
+ /// </summary>
+ /// <param name="message">NMS message to be converted.</param>
+ /// <result>Converted MSMQ message.</result>
Message ToMsmqMessage(IMessage message);
+
+ /// <summary>
+ /// Converts the specified MSMQ message to an equivalent NMS message
+ /// (including its message body).
+ /// </summary>
+ /// <param name="message">MSMQ message to be converted.</param>
+ /// <result>Converted NMS message.</result>
IMessage ToNmsMessage(Message message);
+
+ /// <summary>
+ /// Converts an NMS destination to the equivalent MSMQ destination
+ /// (ie. queue).
+ /// </summary>
+ /// <param name="destination">NMS destination.</param>
+ /// <result>MSMQ queue.</result>
+ MessageQueue ToMsmqDestination(IDestination destination);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverterEx.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/IMessageConverterEx.cs b/src/main/csharp/IMessageConverterEx.cs
new file mode 100644
index 0000000..92be928
--- /dev/null
+++ b/src/main/csharp/IMessageConverterEx.cs
@@ -0,0 +1,44 @@
+using System.Messaging;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ
+{
+ /// <summary>
+ /// Extended IMessageConverter interface supporting new methods for
+ /// optimizing message selection through "selectors".
+ /// The original IMessageConverter is maintained for compatibility
+ /// reasons with existing clients implementing it.
+ /// </summary>
+ public interface IMessageConverterEx : IMessageConverter
+ {
+ /// <summary>
+ /// Converts the specified MSMQ message to an equivalent NMS message.
+ /// </summary>
+ /// <param name="message">MSMQ message to be converted.</param>
+ /// <param name="convertBody">true if message body should be converted.</param>
+ /// <result>Converted NMS message.</result>
+ IMessage ToNmsMessage(Message message, bool convertBody);
+
+ /// <summary>
+ /// Converts an MSMQ message body to the equivalent NMS message body.
+ /// </summary>
+ /// <param name="message">Source MSMQ message.</param>
+ /// <param name="answer">Target NMS message.</param>
+ void ConvertMessageBodyToNMS(Message message, IMessage answer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 6961298..eaaed5c 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -1,6 +1,8 @@
using System;
using System.Messaging;
using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +19,11 @@ using System.Threading;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using Apache.NMS.Util;
namespace Apache.NMS.MSMQ
{
/// <summary>
- /// An object capable of receiving messages from some destination
+ /// An object capable of receiving messages from some destination.
/// </summary>
public class MessageConsumer : IMessageConsumer
{
@@ -31,8 +32,6 @@ namespace Apache.NMS.MSMQ
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
private MessageQueue messageQueue;
- private event MessageListener listener;
- private int listenerCount = 0;
private Thread asyncDeliveryThread = null;
private AutoResetEvent pause = new AutoResetEvent(false);
private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
@@ -44,17 +43,46 @@ namespace Apache.NMS.MSMQ
set { this.consumerTransformer = value; }
}
- public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+ private IMessageReader reader;
+
+ /// <summary>
+ /// Constructs a message consumer on the specified queue.
+ /// </summary>
+ /// <param name="session">The messaging session.</param>
+ /// <param name="acknowledgementMode">The message acknowledgement mode.</param>
+ /// <param name="messageQueue">The message queue to consume messages from.</param>
+ public MessageConsumer(Session session,
+ AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+ : this(session, acknowledgementMode, messageQueue, null)
+ {
+ }
+
+ /// <summary>
+ /// Constructs a message consumer on the specified queue, using a
+ /// selector for filtering incoming messages.
+ /// </summary>
+ /// <param name="session">The messaging session.</param>
+ /// <param name="acknowledgementMode">The message acknowledgement mode.</param>
+ /// <param name="messageQueue">The message queue to consume messages from.</param>
+ /// <param name="selector">The selection criteria.</param>
+ public MessageConsumer(Session session,
+ AcknowledgementMode acknowledgementMode, MessageQueue messageQueue,
+ string selector)
{
this.session = session;
this.acknowledgementMode = acknowledgementMode;
this.messageQueue = messageQueue;
- if(null != this.messageQueue)
+ if(this.messageQueue != null)
{
this.messageQueue.MessageReadPropertyFilter.SetAll();
}
+
+ reader = MessageReaderUtil.CreateMessageReader(
+ messageQueue, session.MessageConverter, selector);
}
+ private int listenerCount = 0;
+ private event MessageListener listener;
public event MessageListener Listener
{
add
@@ -85,32 +113,8 @@ namespace Apache.NMS.MSMQ
if(messageQueue != null)
{
- Message message;
-
- try
- {
- message = messageQueue.Receive(zeroTimeout);
- }
- catch
- {
- message = null;
- }
-
- if(null == message)
- {
- ReceiveCompletedEventHandler receiveMsg =
- delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
- message = messageQueue.EndReceive(asyncResult.AsyncResult);
- pause.Set();
- };
-
- messageQueue.ReceiveCompleted += receiveMsg;
- messageQueue.BeginReceive();
- pause.WaitOne();
- messageQueue.ReceiveCompleted -= receiveMsg;
- }
-
- nmsMessage = ToNmsMessage(message);
+ nmsMessage = reader.Receive();
+ nmsMessage = TransformMessage(nmsMessage);
}
return nmsMessage;
@@ -122,8 +126,8 @@ namespace Apache.NMS.MSMQ
if(messageQueue != null)
{
- Message message = messageQueue.Receive(timeout);
- nmsMessage = ToNmsMessage(message);
+ nmsMessage = reader.Receive(timeout);
+ nmsMessage = TransformMessage(nmsMessage);
}
return nmsMessage;
@@ -135,8 +139,8 @@ namespace Apache.NMS.MSMQ
if(messageQueue != null)
{
- Message message = messageQueue.Receive(zeroTimeout);
- nmsMessage = ToNmsMessage(message);
+ nmsMessage = reader.Receive(zeroTimeout);
+ nmsMessage = TransformMessage(nmsMessage);
}
return nmsMessage;
@@ -226,25 +230,20 @@ namespace Apache.NMS.MSMQ
session.Connection.HandleException(e);
}
- protected virtual IMessage ToNmsMessage(Message message)
+ protected virtual IMessage TransformMessage(IMessage message)
{
- if(message == null)
- {
- return null;
- }
-
- IMessage converted = session.MessageConverter.ToNmsMessage(message);
+ IMessage transformed = message;
- if(this.ConsumerTransformer != null)
+ if(message != null && this.ConsumerTransformer != null)
{
- IMessage newMessage = ConsumerTransformer(this.session, this, converted);
+ IMessage newMessage = ConsumerTransformer(this.session, this, message);
if(newMessage != null)
{
- converted = newMessage;
+ transformed = newMessage;
}
}
- return converted;
+ return transformed;
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/QueueBrowser.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/QueueBrowser.cs b/src/main/csharp/QueueBrowser.cs
index 32752c5..3ff795d 100644
--- a/src/main/csharp/QueueBrowser.cs
+++ b/src/main/csharp/QueueBrowser.cs
@@ -19,6 +19,7 @@ using System.Collections;
using System.Messaging;
using Apache.NMS;
using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
namespace Apache.NMS.MSMQ
{
@@ -30,7 +31,17 @@ namespace Apache.NMS.MSMQ
private readonly Session session;
private MessageQueue messageQueue;
+ private string selector;
+
+ private IMessageReader reader;
+
public QueueBrowser(Session session, MessageQueue messageQueue)
+ : this(session, messageQueue, null)
+ {
+ }
+
+ public QueueBrowser(Session session, MessageQueue messageQueue,
+ string selector)
{
this.session = session;
this.messageQueue = messageQueue;
@@ -39,6 +50,8 @@ namespace Apache.NMS.MSMQ
this.messageQueue.MessageReadPropertyFilter.SetAll();
}
+ reader = MessageReaderUtil.CreateMessageReader(
+ messageQueue, session.MessageConverter, selector);
}
~QueueBrowser()
@@ -95,7 +108,7 @@ namespace Apache.NMS.MSMQ
public string MessageSelector
{
- get { throw new NotSupportedException(); }
+ get { return selector; }
}
public IQueue Queue
@@ -107,11 +120,14 @@ namespace Apache.NMS.MSMQ
{
private readonly Session session;
private readonly MessageEnumerator innerEnumerator;
+ private readonly IMessageReader reader;
- public Enumerator(Session session, MessageQueue messageQueue)
+ public Enumerator(Session session, MessageQueue messageQueue,
+ IMessageReader reader)
{
this.session = session;
this.innerEnumerator = messageQueue.GetMessageEnumerator2();
+ this.reader = reader;
}
public object Current
@@ -124,7 +140,14 @@ namespace Apache.NMS.MSMQ
public bool MoveNext()
{
- return this.innerEnumerator.MoveNext();
+ while(this.innerEnumerator.MoveNext())
+ {
+ if(reader.Matches(this.innerEnumerator.Current))
+ {
+ return true;
+ }
+ }
+ return false;
}
public void Reset()
@@ -135,7 +158,7 @@ namespace Apache.NMS.MSMQ
public IEnumerator GetEnumerator()
{
- return new Enumerator(this.session, this.messageQueue);
+ return new Enumerator(this.session, this.messageQueue, this.reader);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/AbstractMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/AbstractMessageReader.cs b/src/main/csharp/Readers/AbstractMessageReader.cs
new file mode 100644
index 0000000..7874696
--- /dev/null
+++ b/src/main/csharp/Readers/AbstractMessageReader.cs
@@ -0,0 +1,126 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// Abstract MSMQ message reader. Derived classes support various
+ /// message filtering methods.
+ /// </summary>
+ public abstract class AbstractMessageReader : IMessageReader
+ {
+ protected MessageQueue messageQueue;
+ protected IMessageConverter messageConverter;
+ protected IMessageConverterEx messageConverterEx;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ public AbstractMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter)
+ {
+ this.messageQueue = messageQueue;
+
+ this.messageConverter = messageConverter;
+ this.messageConverterEx = (messageConverter as IMessageConverterEx);
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public abstract IMessage Peek();
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public abstract IMessage Peek(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive();
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public abstract IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ public abstract bool Matches(Message message);
+
+ /// <summary>
+ /// Converts an MSMQ message to an NMS message, using the converter
+ /// specified at construction time.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>NMS message.</return>
+ protected IMessage Convert(Message message)
+ {
+ return message == null ? null : messageConverter.ToNmsMessage(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
new file mode 100644
index 0000000..fad3d1a
--- /dev/null
+++ b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
@@ -0,0 +1,139 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// MSMQ message reader, returning messages matching the specified
+ /// message identifier.
+ /// </summary>
+ public class ByCorrelationIdMessageReader : AbstractMessageReader
+ {
+ private string correlationId;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ /// <param name="correlationId">The correlation identifier of messages
+ /// to be read.</param>
+ public ByCorrelationIdMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter, string correlationId)
+ : base(messageQueue, messageConverter)
+ {
+ this.correlationId = correlationId;
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek()
+ {
+ return Convert(messageQueue.PeekByCorrelationId(correlationId));
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek(TimeSpan timeSpan)
+ {
+ return Convert(messageQueue.PeekByCorrelationId(correlationId,
+ timeSpan));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive()
+ {
+ return Convert(messageQueue.ReceiveByCorrelationId(correlationId));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan)
+ {
+ return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+ timeSpan));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(MessageQueueTransaction transaction)
+ {
+ return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+ transaction));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction)
+ {
+ return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+ timeSpan, transaction));
+ }
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ public override bool Matches(Message message)
+ {
+ // NB: case-sensitive match
+ return message.CorrelationId == correlationId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByIdMessageReader.cs b/src/main/csharp/Readers/ByIdMessageReader.cs
new file mode 100644
index 0000000..f981ca8
--- /dev/null
+++ b/src/main/csharp/Readers/ByIdMessageReader.cs
@@ -0,0 +1,136 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// MSMQ message reader, returning messages matching the specified
+ /// message identifier.
+ /// </summary>
+ public class ByIdMessageReader : AbstractMessageReader
+ {
+ private string messageId;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ /// <param name="messageId">The message identifier of messages to
+ /// be read.</param>
+ public ByIdMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter, string messageId)
+ : base(messageQueue, messageConverter)
+ {
+ this.messageId = messageId;
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek()
+ {
+ return Convert(messageQueue.PeekById(messageId));
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek(TimeSpan timeSpan)
+ {
+ return Convert(messageQueue.PeekById(messageId, timeSpan));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive()
+ {
+ return Convert(messageQueue.ReceiveById(messageId));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan)
+ {
+ return Convert(messageQueue.ReceiveById(messageId, timeSpan));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(MessageQueueTransaction transaction)
+ {
+ return Convert(messageQueue.ReceiveById(messageId, transaction));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction)
+ {
+ return Convert(messageQueue.ReceiveById(messageId, timeSpan,
+ transaction));
+ }
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ public override bool Matches(Message message)
+ {
+ // NB: case-sensitive match
+ return message.Id == messageId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByLookupIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByLookupIdMessageReader.cs b/src/main/csharp/Readers/ByLookupIdMessageReader.cs
new file mode 100644
index 0000000..421c52b
--- /dev/null
+++ b/src/main/csharp/Readers/ByLookupIdMessageReader.cs
@@ -0,0 +1,145 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// MSMQ message reader, returning messages matching the specified
+ /// lookup identifier.
+ /// </summary>
+ public class ByLookupIdMessageReader : AbstractMessageReader
+ {
+ private Int64 lookupId;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ /// <param name="lookupId">The lookup identifier of the message
+ /// to be read.</param>
+ public ByLookupIdMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter, Int64 lookupId)
+ : base(messageQueue, messageConverter)
+ {
+ this.lookupId = lookupId;
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek()
+ {
+ return Convert(messageQueue.PeekByLookupId(lookupId));
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek(TimeSpan timeSpan)
+ {
+ // No time-out option for receiving messages by lookup identifiers:
+ // either the message is present in the queue, or the method throws
+ // an exception immediately if the message is not in the queue.
+ return Convert(messageQueue.PeekByLookupId(lookupId));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive()
+ {
+ return Convert(messageQueue.ReceiveByLookupId(lookupId));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan)
+ {
+ // No time-out option for receiving messages by lookup identifiers:
+ // either the message is present in the queue, or the method throws
+ // an exception immediately if the message is not in the queue.
+ return Convert(messageQueue.ReceiveByLookupId(lookupId));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(MessageQueueTransaction transaction)
+ {
+ return Convert(messageQueue.ReceiveByLookupId(
+ MessageLookupAction.Current, lookupId, transaction));
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction)
+ {
+ // No time-out option for receiving messages by lookup identifiers:
+ // either the message is present in the queue, or the method throws
+ // an exception immediately if the message is not in the queue.
+ return Convert(messageQueue.ReceiveByLookupId(
+ MessageLookupAction.Current, lookupId, transaction));
+ }
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ public override bool Matches(Message message)
+ {
+ return message.LookupId == lookupId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/BySelectorMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/BySelectorMessageReader.cs b/src/main/csharp/Readers/BySelectorMessageReader.cs
new file mode 100644
index 0000000..e7cd5c3
--- /dev/null
+++ b/src/main/csharp/Readers/BySelectorMessageReader.cs
@@ -0,0 +1,290 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+using Apache.NMS;
+using Apache.NMS.Selector;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// MSMQ message reader, returning messages matching the specified
+ /// selector.
+ /// </summary>
+ public class BySelectorMessageReader : AbstractMessageReader
+ {
+ private string selector;
+ private MessageEvaluationContext evaluationContext;
+ private IBooleanExpression selectionExpression;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="messageQueue">The MSMQ message queue from which
+ /// messages will be read.</param>
+ /// <param name="messageConverter">A message converter for mapping
+ /// MSMQ messages to NMS messages.</param>
+ /// <param name="selector">The selector string.</param>
+ public BySelectorMessageReader(MessageQueue messageQueue,
+ IMessageConverter messageConverter, string selector)
+ : base(messageQueue, messageConverter)
+ {
+ this.selector = selector;
+
+ SelectorParser selectorParser = new SelectorParser();
+ selectionExpression = selectorParser.Parse(selector);
+
+ evaluationContext = new MessageEvaluationContext(null);
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek()
+ {
+ return InternalPeek(DateTime.MaxValue, true);
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ public override IMessage Peek(TimeSpan timeSpan)
+ {
+ DateTime maxTime = DateTime.Now + timeSpan;
+ return InternalPeek(maxTime, true);
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive()
+ {
+ return InternalReceive(DateTime.MaxValue, null);
+ }
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan)
+ {
+ return InternalReceive(DateTime.Now + timeSpan, null);
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(MessageQueueTransaction transaction)
+ {
+ return InternalReceive(DateTime.MaxValue, transaction);
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public override IMessage Receive(TimeSpan timeSpan,
+ MessageQueueTransaction transaction)
+ {
+ return InternalReceive(DateTime.Now + timeSpan, transaction);
+ }
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="maxTime">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ public IMessage InternalReceive(DateTime maxTime,
+ MessageQueueTransaction transaction)
+ {
+ // In a shared connection / multi-consumer context, the message may
+ // have been consumed by another client, after it was peeked but
+ // before it was peeked by this client. Hence the loop.
+ // (not sure it can be shared AND transactional, though).
+ while(true)
+ {
+ IMessage peekedMessage = InternalPeek(maxTime, false);
+
+ if(peekedMessage == null)
+ {
+ return null;
+ }
+
+ try
+ {
+ long lookupId = peekedMessage.Properties.GetLong("LookupId");
+
+ Message message = (transaction == null ?
+ messageQueue.ReceiveByLookupId(lookupId) :
+ messageQueue.ReceiveByLookupId(
+ MessageLookupAction.Current, lookupId, transaction));
+
+ return Convert(message);
+ }
+ catch(InvalidOperationException exc)
+ {
+ // TODO: filter exceptions, catch only exceptions due to
+ // unknown lookup id.
+ }
+ }
+ }
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue, matching the selection criteria.
+ /// </summary>
+ /// <param name="maxTime">Reception time-out.</param>
+ /// <param name="convertBody">true if message body should be converted.</param>
+ /// <returns>Peeked message.</returns>
+ private IMessage InternalPeek(DateTime maxTime, bool convertBody)
+ {
+ TimeSpan timeSpan = maxTime - DateTime.Now;
+ if(timeSpan <= TimeSpan.Zero)
+ {
+ timeSpan = TimeSpan.Zero;
+ }
+
+ Cursor cursor = messageQueue.CreateCursor();
+
+ PeekAction action = PeekAction.Current;
+
+ while(true)
+ {
+ Message msmqMessage = null;
+
+ try
+ {
+ msmqMessage = messageQueue.Peek(timeSpan, cursor, action);
+ }
+ catch(MessageQueueException exc)
+ {
+ if(exc.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
+ {
+ throw exc;
+ }
+ }
+
+ if(msmqMessage == null)
+ {
+ return null;
+ }
+
+ IMessage nmsMessage = InternalMatch(msmqMessage, convertBody);
+
+ if(nmsMessage != null)
+ {
+ return nmsMessage;
+ }
+
+ action = PeekAction.Next;
+ }
+ }
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria. If matched
+ /// the method returns the converted NMS message. Else it returns null.
+ /// </summary>
+ /// <param name="message">The MSMQ message to check.</param>
+ /// <param name="convertBody">true if the message body should be
+ /// converted.</param>
+ /// <returns>The matching message converted to NMS, or null.</returns>
+ private IMessage InternalMatch(Message message, bool convertBody)
+ {
+ if(messageConverterEx == null)
+ {
+ IMessage nmsMessage = messageConverter.ToNmsMessage(message);
+
+ evaluationContext.Message = nmsMessage;
+
+ if(selectionExpression.Matches(evaluationContext))
+ {
+ return nmsMessage;
+ }
+ }
+ else
+ {
+ // This version converts the message body only for those
+ // messages matching the selection criteria.
+ // Relies on MessageConverterEx for partial conversions.
+ IMessage nmsMessage = messageConverterEx.ToNmsMessage(
+ message, false);
+
+ evaluationContext.Message = nmsMessage;
+
+ if(selectionExpression.Matches(evaluationContext))
+ {
+ if(convertBody)
+ {
+ messageConverterEx.ConvertMessageBodyToNMS(
+ message, nmsMessage);
+ }
+
+ return nmsMessage;
+ }
+ }
+
+ return null;
+ }
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ public override bool Matches(Message message)
+ {
+ IMessage nmsMessage = messageConverterEx == null ?
+ messageConverter.ToNmsMessage(message) :
+ messageConverterEx.ToNmsMessage(message, false);
+
+ evaluationContext.Message = nmsMessage;
+
+ return selectionExpression.Matches(evaluationContext);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/IMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/IMessageReader.cs b/src/main/csharp/Readers/IMessageReader.cs
new file mode 100644
index 0000000..8168664
--- /dev/null
+++ b/src/main/csharp/Readers/IMessageReader.cs
@@ -0,0 +1,93 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+ /// <summary>
+ /// MSMQ message reader.
+ /// </summary>
+ public interface IMessageReader
+ {
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available.
+ /// </summary>
+ /// <returns>Peeked message.</returns>
+ IMessage Peek();
+
+ /// <summary>
+ /// Returns without removing (peeks) the first message in the queue
+ /// referenced by this MessageQueue matching the selection criteria.
+ /// The Peek method is synchronous, so it blocks the current thread
+ /// until a message becomes available or the specified time-out occurs.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Peeked message.</returns>
+ IMessage Peek(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by
+ /// the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <returns>Received message.</returns>
+ IMessage Receive();
+
+ /// <summary>
+ /// Receives the first message available in the queue referenced by the
+ /// MessageQueue matching the selection criteria, and waits until either
+ /// a message is available in the queue, or the time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <returns>Received message.</returns>
+ IMessage Receive(TimeSpan timeSpan);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria.
+ /// This call is synchronous, and blocks the current thread of execution
+ /// until a message is available.
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ IMessage Receive(MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Receives the first message available in the transactional queue
+ /// referenced by the MessageQueue matching the selection criteria,
+ /// and waits until either a message is available in the queue, or the
+ /// time-out expires.
+ /// </summary>
+ /// <param name="timeSpan">Reception time-out.</param>
+ /// <param name="transaction">Transaction.</param>
+ /// <returns>Received message.</returns>
+ IMessage Receive(TimeSpan timeSpan, MessageQueueTransaction transaction);
+
+ /// <summary>
+ /// Checks if an MSMQ message matches the selection criteria.
+ /// </summary>
+ /// <param name="message">MSMQ message.</param>
+ /// <return>true if the message matches the selection criteria.</return>
+ bool Matches(Message message);
+ }
+}