You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/05 14:05:42 UTC

svn commit: r514662 - in /activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ: BaseMessage.cs DefaultMessageConverter.cs IMessageConverter.cs MessageConsumer.cs MessageProducer.cs ObjectMessage.cs Session.cs

Author: jstrachan
Date: Mon Mar  5 05:05:42 2007
New Revision: 514662

URL: http://svn.apache.org/viewvc?view=rev&rev=514662
Log:
added support for receiving messages from MSMQ together with improving the conversions to/from MSMQ and done a little refactor here and there

Modified:
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs Mon Mar  5 05:05:42 2007
@@ -24,7 +24,7 @@
     public class BaseMessage : IMessage
     {
         private PrimitiveMap properties;
-        private Destination destination;
+        private IDestination destination;
         private string correlationId;
         private TimeSpan expiration;
         private string messageId;
@@ -90,6 +90,9 @@
             get {
                 return destination;
             }
+            set {
+                destination = value;
+            }
         }
         
         /// <summary>
@@ -113,6 +116,9 @@
             get {
                 return messageId;
             }
+			set {
+				messageId = value;
+			}
         }
         
         /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs Mon Mar  5 05:05:42 2007
@@ -23,31 +23,75 @@
 {
     public class DefaultMessageConverter : IMessageConverter
 	{
-        public Message convertToMSMQMessage(IMessage message)
+        public virtual Message ToMsmqMessage(IMessage message)
         {
-            Message msg = new Message();
+            Message answer = new Message();
             MessageQueue responseQueue=null;
             if (message.NMSReplyTo != null)
             {
-                responseQueue = new MessageQueue(((Destination)message.NMSReplyTo).Path);
+                IDestination destination = message.NMSReplyTo;
+				responseQueue = ToMsmqDestination(destination);
             }
             //if (message.NMSExpiration != null)
             //{
-                msg.TimeToBeReceived = message.NMSExpiration;
+                answer.TimeToBeReceived = message.NMSExpiration;
             //}
             if (message.NMSCorrelationID != null)
             {
-                msg.CorrelationId = message.NMSCorrelationID;
+                answer.CorrelationId = message.NMSCorrelationID;
             }
-            msg.Recoverable = message.NMSPersistent;
-            msg.Priority = MessagePriority.Normal;
-            msg.ResponseQueue = responseQueue;
-
-            return msg;
+            answer.Recoverable = message.NMSPersistent;
+            answer.Priority = MessagePriority.Normal;
+            answer.ResponseQueue = responseQueue;
+			answer.Label = message.NMSType;
+            return answer;
         }
-        public IMessage convertFromMSMQMessage(Message message)
+		
+        public virtual IMessage ToNmsMessage(Message message)
         {
-            return null;
+			BaseMessage answer = CreateNmsMessage(message);
+			answer.NMSMessageId = message.Id;
+			if (message.CorrelationId != null)
+			{
+				answer.NMSCorrelationID = message.CorrelationId;
+			}
+			answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
+			answer.NMSType = message.Label;
+			answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
+			answer.NMSExpiration = message.TimeToBeReceived;
+            return answer;
         }
+		
+		
+		public MessageQueue ToMsmqDestination(IDestination destination)
+		{
+			return new MessageQueue((destination as Destination).Path);
+		}
+
+		protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
+		{
+			if (destinationQueue == null)
+			{
+				return null;
+			}
+			return new Queue(destinationQueue.Path);
+		}
+	
+		protected virtual BaseMessage CreateNmsMessage(Message message)
+		{
+			object body = message.Body;
+			if (body == null)
+			{
+				return new BaseMessage();
+			}
+			else if (body is string)
+			{
+				return new TextMessage(body as string);
+			}
+			else
+			{
+				return new ObjectMessage(body);
+			}
+		}
 	}
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs Mon Mar  5 05:05:42 2007
@@ -21,7 +21,15 @@
 {
     public interface IMessageConverter
     {
-        Message convertToMSMQMessage(IMessage message);
-        IMessage convertFromMSMQMessage(Message message);
+		
+		/// <summary>
+	/// Method ToMSMQMessageQueue
+/// </summary>
+ /// <param name="destination">An IDestination</param>
+ /// <returns>A  MessageQueue</retutns>
+MessageQueue ToMsmqDestination(IDestination destination);
+
+        Message ToMsmqMessage(IMessage message);
+        IMessage ToNmsMessage(Message message);
     }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageConsumer.cs Mon Mar  5 05:05:42 2007
@@ -14,8 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System;
+using ActiveMQ;
+using ActiveMQ.Util;
 using NMS;
+using System;
+using System.Messaging;
 using System.Threading;
 
 namespace MSMQ
@@ -25,32 +28,50 @@
     /// </summary>
     public class MessageConsumer : IMessageConsumer
     {
+		protected const TimeSpan zeroTimeout = new TimeSpan(0);
+		
         private readonly Session session;
         private readonly AcknowledgementMode acknowledgementMode;
-
-        public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode)
+		private MessageQueue messageQueue;
+		private event MessageListener listener;
+		private AtomicBoolean asyncDelivery = new AtomicBoolean(false);
+		
+        public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
         {
             this.session = session;
-            this.acknowledgementMode = acknowledgementMode;            
+            this.acknowledgementMode = acknowledgementMode;
+			this.messageQueue = messageQueue;
         }
-
+        
+        public event MessageListener Listener
+        {
+			add {
+				listener += value;
+				StartAsyncDelivery();
+			}
+			remove {
+				listener -= value;
+			}
+        }
+		
         public IMessage Receive()
         {
-            throw new NotImplementedException();
+			Message message = messageQueue.Receive();
+			return ToNmsMessage(message);
         }
 
         public IMessage Receive(TimeSpan timeout)
         {
-            throw new NotImplementedException();
+			Message message = messageQueue.Receive(timeout);
+			return ToNmsMessage(message);
         }
 
         public IMessage ReceiveNoWait()
         {
-            throw new NotImplementedException();
+			Message message = messageQueue.Receive(zeroTimeout);
+			return ToNmsMessage(message);
         }
 
-        public event MessageListener Listener;
-
         public void Dispose()
         {
             throw new NotImplementedException();
@@ -58,8 +79,64 @@
 
         public void Close()
         {
+			StopmAsyncDelivery();
             Dispose();
         }
-
+		
+		public void StopmAsyncDelivery()
+		{
+			asyncDelivery.Value = true;
+		}
+
+		protected virtual void StartAsyncDelivery()
+		{
+			if (asyncDelivery.CompareAndSet(false, true)) {
+				Thread thread = new Thread(DispatchLoop);
+				thread.Start();
+			}
+		}
+		
+		protected virtual void DispatchLoop()
+		{
+			Tracer.Info("Starting dispatcher thread consumer: " + this);
+			while (asyncDelivery.Value)
+			{
+				IMessage message = Receive();
+				if (message != null)
+				{
+					try
+					{
+						listener(message);
+					}
+					catch (Exception e)
+					{
+						HandleAsyncException(e);
+					}
+				}
+			}
+			Tracer.Info("Stopping dispatcher thread consumer: " + this);
+		}
+
+		protected virtual void HandleAsyncException(Exception e)
+		{
+			ExceptionListener exceptionListener = session.Connection.ExceptionListener;
+			if (exceptionListener != null)
+			{
+				exceptionListener(e);
+			}
+			else
+			{
+				Tracer.Error(e);
+			}
+		}
+		
+		protected virtual IMessage ToNmsMessage(Message message)
+		{
+			if (message == null)
+			{
+				return null;
+			}
+			return session.MessageConverter.ToNmsMessage(message);
+		}
     }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/MessageProducer.cs Mon Mar  5 05:05:42 2007
@@ -127,7 +127,7 @@
                 message.NMSPriority = priority;
                 
                 // message.NMSTimestamp = new DateTime().Date.;
-                Message msg = messageConverter.convertToMSMQMessage(message);
+                Message msg = messageConverter.ToMsmqMessage(message);
                 // TODO: message.NMSMessageId =
                 // Now Send the message
                 if( mq.Transactional )

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ObjectMessage.cs Mon Mar  5 05:05:42 2007
@@ -29,11 +29,20 @@
 {
     public class ObjectMessage : BaseMessage, IObjectMessage
     {
-		#if !(PocketPC||NETCF||NETCF_2_0)
 		private object body;
+		#if !(PocketPC||NETCF||NETCF_2_0)
 		private IFormatter formatter;
 		#endif
 		
+		public ObjectMessage()
+		{
+		}
+		
+		public ObjectMessage(object body)
+		{
+			this.body = body;
+		}
+		
 		public object Body
 		{
 			get {
@@ -42,10 +51,9 @@
 				{
 					body = Formatter.Deserialize(new MemoryStream(Content));
 				}
-				return body;
 				#else
-                throw new NotImplementedException();
 				#endif
+				return body;
 			}
 			
 			set {
@@ -73,6 +81,7 @@
 				formatter = value;
 			}
 		}
+	
 		#endif
 	}
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs?view=diff&rev=514662&r1=514661&r2=514662
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Session.cs Mon Mar  5 05:05:42 2007
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using System.Messaging;
 using NMS;
 using System;
+using System.Messaging;
 
 namespace MSMQ
 {
@@ -66,17 +66,22 @@
         
         public IMessageConsumer CreateConsumer(IDestination destination, string selector)
         {
-            return new MessageConsumer(this, acknowledgementMode);
+            return CreateConsumer(destination, selector, false);
         }
 
         public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
         {
-            throw new NotImplementedException();
+			if (selector != null)
+			{
+				throw new NotImplementedException("Selectors are not supported by MSQM");
+			}
+			MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
+            return new MessageConsumer(this, acknowledgementMode, queue);
         }
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
         {
-            return null;
+            throw new NotImplementedException("Durable Topic subscribers are not supported by MSMQ");
         }
         
         public IQueue GetQueue(string name)