You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/06 23:23:35 UTC

svn commit: r1556053 - in /activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp: DefaultMessageConverter.cs MessageConsumer.cs MessageProducer.cs

Author: tabish
Date: Mon Jan  6 22:23:35 2014
New Revision: 1556053

URL: http://svn.apache.org/r1556053
Log:
https://issues.apache.org/jira/browse/AMQNET-454

apply patch: https://issues.apache.org/jira/secure/attachment/12621690/Apache.NMS.AMQP-add-message-conversions-06.patch

Modified:
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Jan  6 22:23:35 2014
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 using System;
+using System.Collections;
+using System.Collections.Generic;
 using System.IO;
 using System.Text;
 using Apache.NMS.Util;
@@ -22,28 +24,272 @@ using Org.Apache.Qpid.Messaging;
 
 namespace Apache.NMS.Amqp
 {
-	public enum NMSMessageType
-	{
-		BaseMessage,
-		TextMessage,
-		BytesMessage,
-		ObjectMessage,
-		MapMessage,
-		StreamMessage
-	}
-
-	public class DefaultMessageConverter : IMessageConverter
-	{
-		public virtual Message ToAmqpMessage(IMessage message)
-		{
-			Message amqpMessage = new Message();
-			return amqpMessage;
-		}
-
-		public virtual IMessage ToNmsMessage(Message message)
-		{
-            BaseMessage answer = null; // CreateNmsMessage(message);
-			return answer;
-		}
-	}
+    public enum NMSMessageType
+    {
+        BaseMessage,
+        TextMessage,
+        BytesMessage,
+        ObjectMessage,
+        MapMessage,
+        StreamMessage
+    }
+
+    public class DefaultMessageConverter : IMessageConverter
+    {
+        #region IMessageConverter Members
+        // NMS Message                       AMQP Message
+        // ================================  =================
+        // string          NMSCorrelationID  string     CorrelationId
+        // MsgDeliveryMode NMSDeliveryMode   bool       Durable
+        // IDestination    NMSDestination
+        // string          MNSMessageId      string     MessageId
+        // MsgPriority     NMSPriority       byte       Priority
+        // bool            NMSRedelivered    bool       Redelivered
+        // IDestination    NMSReplyTo        Address    ReplyTo
+        // DateTime        NMSTimestamp
+        // TimeSpan        NMSTimeToLive     Duration   Ttl
+        // string          NMSType           string     ContentType
+        // IPrimitiveMap   Properties        Dictionary Properties
+        //                                   string     Subject
+        //                                   string     UserId
+        //
+        public virtual Message ToAmqpMessage(IMessage message)
+        {
+            Message amqpMessage = CreateAmqpMessage(message);
+
+            if (null != message.NMSCorrelationID)
+            {
+                amqpMessage.CorrelationId = message.NMSCorrelationID;
+            }
+            amqpMessage.Durable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
+            if (null != message.NMSMessageId)
+            {
+                amqpMessage.MessageId = message.NMSMessageId;
+            }
+            amqpMessage.Priority = ToAmqpMessagePriority(message.NMSPriority);
+            amqpMessage.Redelivered = message.NMSRedelivered;
+            if (null != message.NMSReplyTo)
+            {
+                amqpMessage.ReplyTo = ToAmqpAddress(message.NMSReplyTo);
+            }
+
+            if (message.NMSTimeToLive != TimeSpan.Zero)
+            {
+                amqpMessage.Ttl = ToQpidDuration(message.NMSTimeToLive);
+            }
+
+            if (null != message.NMSType)
+            {
+                amqpMessage.ContentType = message.NMSType;
+            }
+
+            amqpMessage.Properties = FromNmsPrimitiveMap(message.Properties);
+
+            // TODO: NMSDestination, Amqp.Subect, Amqp.UserId
+            return amqpMessage;
+        }
+
+        //
+        public virtual IMessage ToNmsMessage(Message message)
+        {
+            BaseMessage answer = CreateNmsMessage(message);
+
+            try
+            {
+                answer.NMSCorrelationID = message.CorrelationId;
+                answer.NMSDeliveryMode = (message.Durable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
+                answer.NMSMessageId = message.Subject;
+                answer.NMSPriority = ToNmsPriority(message.Priority);
+                answer.NMSRedelivered = message.Redelivered;
+                answer.NMSReplyTo = ToNmsDestination(message.ReplyTo);
+                answer.NMSTimeToLive = ToNMSTimespan(message.Ttl);
+                answer.NMSType = message.ContentType;
+                SetNmsPrimitiveMap(answer.Properties, message.Properties);
+
+                // TODO: NMSDestination, NMSTimestamp, Properties
+            }
+            catch (InvalidOperationException)
+            {
+            }
+
+            return answer;
+        }
+        #endregion
+
+        #region MessagePriority Methods
+        //
+        private static byte ToAmqpMessagePriority(MsgPriority msgPriority)
+        {
+            return (byte)msgPriority;
+        }
+
+        //
+        private static MsgPriority ToNmsPriority(byte qpidMsgPriority)
+        {
+            if (qpidMsgPriority > (byte)MsgPriority.Highest)
+            {
+                return MsgPriority.Highest;
+            }
+            return (MsgPriority)qpidMsgPriority;
+        }
+        #endregion
+
+        #region Duration Methods
+        //
+        private static Duration ToQpidDuration(TimeSpan timespan)
+        {
+            if (timespan.TotalMilliseconds <= 0)
+            {
+                Duration result = DurationConstants.IMMEDIATE;
+                return result;
+            }
+            else if (timespan.TotalMilliseconds > (Double)DurationConstants.FORVER.Milliseconds)
+            {
+                Duration result = DurationConstants.FORVER;
+                return result;
+            }
+            else
+            {
+                Duration result = new Duration((UInt64)timespan.TotalMilliseconds);
+                return result;
+            }
+        }
+
+        //
+        private static TimeSpan ToNMSTimespan(Duration duration)
+        {
+            if (duration.Milliseconds > Int64.MaxValue)
+            {
+                TimeSpan result = new TimeSpan(Int64.MaxValue);
+                return result;
+            }
+            else
+            {
+                TimeSpan result = new TimeSpan((Int64)duration.Milliseconds);
+                return result;
+            }
+        }
+        #endregion
+
+        #region MessageBody Conversion Methods
+        protected virtual Message CreateAmqpMessage(IMessage message)
+        {
+            if (message is TextMessage)
+            {
+                TextMessage textMessage = message as TextMessage;
+                Message result = new Message(textMessage.Text);
+                return result;
+            }
+            else if (message is BytesMessage)
+            {
+                BytesMessage bytesMessage = message as BytesMessage;
+                Message result = new Message(bytesMessage.Content, 0, bytesMessage.Content.Length);
+                return result;
+            }
+            else if (message is ObjectMessage)
+            {
+                ObjectMessage objectMessage = message as ObjectMessage;
+                Message result = new Message(objectMessage.Body);
+                return result;
+            }
+            else if (message is MapMessage)
+            {
+                MapMessage mapMessage = message as MapMessage;
+                PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
+                byte[] buf = mapBody.Marshal();
+                Message result = new Message(buf, 0, buf.Length);
+                return result;
+            }
+            else if (message is StreamMessage)
+            {
+                StreamMessage streamMessage = message as StreamMessage;
+                Message result = new Message(streamMessage.Content, 0, streamMessage.Content.Length);
+                return result;
+            }
+            else if (message is BaseMessage)
+            {
+                Message result = new Message();
+                return result;
+            }
+            else
+            {
+                throw new Exception("unhandled message type");
+            }
+        }
+
+        protected virtual BaseMessage CreateNmsMessage(Message message)
+        {
+            BaseMessage result = null;
+
+            if ("amqp/map" == message.ContentType)
+            {
+                // TODO: Return map message
+            }
+            else if ("amqp/list" == message.ContentType)
+            {
+                // TODO: Return list message
+            }
+            else
+            {
+                TextMessage textMessage = new TextMessage();
+                textMessage.Text = message.GetContent();
+                result = textMessage;
+            }
+
+            return result;
+        }
+        #endregion
+
+        #region Address/Destination Conversion Methods
+        public Address ToAmqpAddress(IDestination destination)
+        {
+            if (null == destination)
+            {
+                return null;
+            }
+
+            return new Address((destination as Destination).Path);
+        }
+
+        protected virtual IDestination ToNmsDestination(Address destinationQueue)
+        {
+            if (null == destinationQueue)
+            {
+                return null;
+            }
+
+            return new Queue(destinationQueue.ToString());
+        }
+        #endregion
+
+        #region PrimitiveMap Conversion Methods
+
+        //
+        public void SetNmsPrimitiveMap(IPrimitiveMap map, Dictionary<string, object> dict)
+        {
+            
+            // TODO: lock?
+            map.Clear();
+            foreach (System.Collections.Generic.KeyValuePair
+                    <string, object> kvp in dict)
+            {
+                map[kvp.Key] = kvp.Value;
+            }
+        }
+
+        //
+        public Dictionary<string, object> FromNmsPrimitiveMap(IPrimitiveMap pm)
+        {
+            Dictionary<string, object> dict = new Dictionary<string,object>();
+
+            // TODO: lock?
+            ICollection keys = pm.Keys;
+            foreach (object key in keys)
+            {
+                dict.Add(key.ToString(), pm[key.ToString()]);
+            }
+            return dict;
+        }
+        #endregion
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Jan  6 22:23:35 2014
@@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp
                 try
                 {
                     // Create qpid sender
-                    Console.WriteLine("Start Consumer Id = " + ConsumerId.ToString());
+                    Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString());
                     if (qpidReceiver == null)
                     {
                         qpidReceiver = session.CreateQpidReceiver(destination.ToString());
@@ -141,8 +141,11 @@ namespace Apache.NMS.Amqp
         {
             IMessage nmsMessage = null;
 
-            // TODO: Receive a message
-
+            Message qpidMessage = new Message();
+            if (qpidReceiver.Fetch(ref qpidMessage))
+            {
+                nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
+            }
             return nmsMessage;
         }
 

Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Mon Jan  6 22:23:35 2014
@@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp
                 try
                 {
                     // Create qpid sender
-                    Console.WriteLine("Start Producer Id = " + ProducerId.ToString()); 
+                    Tracer.DebugFormat("Start Producer Id = " + ProducerId.ToString()); 
                     if (qpidSender == null)
                     {
                         qpidSender = session.CreateQpidSender(destination.ToString());
@@ -102,7 +102,7 @@ namespace Apache.NMS.Amqp
             {
                 try
                 {
-                    Console.WriteLine("Stop  Producer Id = " + ProducerId);
+                    Tracer.DebugFormat("Stop  Producer Id = " + ProducerId);
                     qpidSender.Dispose();
                     qpidSender = null;
                 }
@@ -158,7 +158,7 @@ namespace Apache.NMS.Amqp
                 // Convert the Message into a Amqp message
                 Message msg = session.MessageConverter.ToAmqpMessage(message);
 
-                // TODO: send the message!
+                qpidSender.Send(msg);
             }
             catch (Exception e)
             {