You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/05 14:05:42 UTC
svn commit: r514662 - in
/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ: BaseMessage.cs
DefaultMessageConverter.cs IMessageConverter.cs MessageConsumer.cs
MessageProducer.cs ObjectMessage.cs Session.cs
Author: jstrachan
Date: Mon Mar 5 05:05:42 2007
New Revision: 514662
URL: http://svn.apache.org/viewvc?view=rev&rev=514662
Log:
added support for receiving messages from MSMQ together with improving the conversions to/from MSMQ and done a little refactor here and there
Modified:
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs
activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs Mon Mar 5 05:05:42 2007
@@ -24,7 +24,7 @@
public class BaseMessage : IMessage
{
private PrimitiveMap properties;
- private Destination destination;
+ private IDestination destination;
private string correlationId;
private TimeSpan expiration;
private string messageId;
@@ -90,6 +90,9 @@
get {
return destination;
}
+ set {
+ destination = value;
+ }
}
/// <summary>
@@ -113,6 +116,9 @@
get {
return messageId;
}
+ set {
+ messageId = value;
+ }
}
/// <summary>
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs Mon Mar 5 05:05:42 2007
@@ -23,31 +23,75 @@
{
public class DefaultMessageConverter : IMessageConverter
{
- public Message convertToMSMQMessage(IMessage message)
+ public virtual Message ToMsmqMessage(IMessage message)
{
- Message msg = new Message();
+ Message answer = new Message();
MessageQueue responseQueue=null;
if (message.NMSReplyTo != null)
{
- responseQueue = new MessageQueue(((Destination)message.NMSReplyTo).Path);
+ IDestination destination = message.NMSReplyTo;
+ responseQueue = ToMsmqDestination(destination);
}
//if (message.NMSExpiration != null)
//{
- msg.TimeToBeReceived = message.NMSExpiration;
+ answer.TimeToBeReceived = message.NMSExpiration;
//}
if (message.NMSCorrelationID != null)
{
- msg.CorrelationId = message.NMSCorrelationID;
+ answer.CorrelationId = message.NMSCorrelationID;
}
- msg.Recoverable = message.NMSPersistent;
- msg.Priority = MessagePriority.Normal;
- msg.ResponseQueue = responseQueue;
-
- return msg;
+ answer.Recoverable = message.NMSPersistent;
+ answer.Priority = MessagePriority.Normal;
+ answer.ResponseQueue = responseQueue;
+ answer.Label = message.NMSType;
+ return answer;
}
- public IMessage convertFromMSMQMessage(Message message)
+
+ public virtual IMessage ToNmsMessage(Message message)
{
- return null;
+ BaseMessage answer = CreateNmsMessage(message);
+ answer.NMSMessageId = message.Id;
+ if (message.CorrelationId != null)
+ {
+ answer.NMSCorrelationID = message.CorrelationId;
+ }
+ answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
+ answer.NMSType = message.Label;
+ answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
+ answer.NMSExpiration = message.TimeToBeReceived;
+ return answer;
}
+
+
+ public MessageQueue ToMsmqDestination(IDestination destination)
+ {
+ return new MessageQueue((destination as Destination).Path);
+ }
+
+ protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
+ {
+ if (destinationQueue == null)
+ {
+ return null;
+ }
+ return new Queue(destinationQueue.Path);
+ }
+
+ protected virtual BaseMessage CreateNmsMessage(Message message)
+ {
+ object body = message.Body;
+ if (body == null)
+ {
+ return new BaseMessage();
+ }
+ else if (body is string)
+ {
+ return new TextMessage(body as string);
+ }
+ else
+ {
+ return new ObjectMessage(body);
+ }
+ }
}
}
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs Mon Mar 5 05:05:42 2007
@@ -21,7 +21,15 @@
{
public interface IMessageConverter
{
- Message convertToMSMQMessage(IMessage message);
- IMessage convertFromMSMQMessage(Message message);
+
+ /// <summary>
+ /// Method ToMSMQMessageQueue
+/// </summary>
+ /// <param name="destination">An IDestination</param>
+ /// <returns>A MessageQueue</retutns>
+MessageQueue ToMsmqDestination(IDestination destination);
+
+ Message ToMsmqMessage(IMessage message);
+ IMessage ToNmsMessage(Message message);
}
}
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs Mon Mar 5 05:05:42 2007
@@ -14,8 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System;
+using ActiveMQ;
+using ActiveMQ.Util;
using NMS;
+using System;
+using System.Messaging;
using System.Threading;
namespace MSMQ
@@ -25,32 +28,50 @@
/// </summary>
public class MessageConsumer : IMessageConsumer
{
+ protected const TimeSpan zeroTimeout = new TimeSpan(0);
+
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
-
- public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode)
+ private MessageQueue messageQueue;
+ private event MessageListener listener;
+ private AtomicBoolean asyncDelivery = new AtomicBoolean(false);
+
+ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
{
this.session = session;
- this.acknowledgementMode = acknowledgementMode;
+ this.acknowledgementMode = acknowledgementMode;
+ this.messageQueue = messageQueue;
}
-
+
+ public event MessageListener Listener
+ {
+ add {
+ listener += value;
+ StartAsyncDelivery();
+ }
+ remove {
+ listener -= value;
+ }
+ }
+
public IMessage Receive()
{
- throw new NotImplementedException();
+ Message message = messageQueue.Receive();
+ return ToNmsMessage(message);
}
public IMessage Receive(TimeSpan timeout)
{
- throw new NotImplementedException();
+ Message message = messageQueue.Receive(timeout);
+ return ToNmsMessage(message);
}
public IMessage ReceiveNoWait()
{
- throw new NotImplementedException();
+ Message message = messageQueue.Receive(zeroTimeout);
+ return ToNmsMessage(message);
}
- public event MessageListener Listener;
-
public void Dispose()
{
throw new NotImplementedException();
@@ -58,8 +79,64 @@
public void Close()
{
+ StopmAsyncDelivery();
Dispose();
}
-
+
+ public void StopmAsyncDelivery()
+ {
+ asyncDelivery.Value = true;
+ }
+
+ protected virtual void StartAsyncDelivery()
+ {
+ if (asyncDelivery.CompareAndSet(false, true)) {
+ Thread thread = new Thread(DispatchLoop);
+ thread.Start();
+ }
+ }
+
+ protected virtual void DispatchLoop()
+ {
+ Tracer.Info("Starting dispatcher thread consumer: " + this);
+ while (asyncDelivery.Value)
+ {
+ IMessage message = Receive();
+ if (message != null)
+ {
+ try
+ {
+ listener(message);
+ }
+ catch (Exception e)
+ {
+ HandleAsyncException(e);
+ }
+ }
+ }
+ Tracer.Info("Stopping dispatcher thread consumer: " + this);
+ }
+
+ protected virtual void HandleAsyncException(Exception e)
+ {
+ ExceptionListener exceptionListener = session.Connection.ExceptionListener;
+ if (exceptionListener != null)
+ {
+ exceptionListener(e);
+ }
+ else
+ {
+ Tracer.Error(e);
+ }
+ }
+
+ protected virtual IMessage ToNmsMessage(Message message)
+ {
+ if (message == null)
+ {
+ return null;
+ }
+ return session.MessageConverter.ToNmsMessage(message);
+ }
}
}
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs Mon Mar 5 05:05:42 2007
@@ -127,7 +127,7 @@
message.NMSPriority = priority;
// message.NMSTimestamp = new DateTime().Date.;
- Message msg = messageConverter.convertToMSMQMessage(message);
+ Message msg = messageConverter.ToMsmqMessage(message);
// TODO: message.NMSMessageId =
// Now Send the message
if( mq.Transactional )
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs Mon Mar 5 05:05:42 2007
@@ -29,11 +29,20 @@
{
public class ObjectMessage : BaseMessage, IObjectMessage
{
- #if !(PocketPC||NETCF||NETCF_2_0)
private object body;
+ #if !(PocketPC||NETCF||NETCF_2_0)
private IFormatter formatter;
#endif
+ public ObjectMessage()
+ {
+ }
+
+ public ObjectMessage(object body)
+ {
+ this.body = body;
+ }
+
public object Body
{
get {
@@ -42,10 +51,9 @@
{
body = Formatter.Deserialize(new MemoryStream(Content));
}
- return body;
#else
- throw new NotImplementedException();
#endif
+ return body;
}
set {
@@ -73,6 +81,7 @@
formatter = value;
}
}
+
#endif
}
}
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs Mon Mar 5 05:05:42 2007
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-using System.Messaging;
using NMS;
using System;
+using System.Messaging;
namespace MSMQ
{
@@ -66,17 +66,22 @@
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
- return new MessageConsumer(this, acknowledgementMode);
+ return CreateConsumer(destination, selector, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
{
- throw new NotImplementedException();
+ if (selector != null)
+ {
+ throw new NotImplementedException("Selectors are not supported by MSQM");
+ }
+ MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
+ return new MessageConsumer(this, acknowledgementMode, queue);
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
- return null;
+ throw new NotImplementedException("Durable Topic subscribers are not supported by MSMQ");
}
public IQueue GetQueue(string name)