You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/06 23:23:35 UTC
svn commit: r1556053 - in
/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp:
DefaultMessageConverter.cs MessageConsumer.cs MessageProducer.cs
Author: tabish
Date: Mon Jan 6 22:23:35 2014
New Revision: 1556053
URL: http://svn.apache.org/r1556053
Log:
https://issues.apache.org/jira/browse/AMQNET-454
apply patch: https://issues.apache.org/jira/secure/attachment/12621690/Apache.NMS.AMQP-add-message-conversions-06.patch
Modified:
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/DefaultMessageConverter.cs Mon Jan 6 22:23:35 2014
@@ -15,6 +15,8 @@
* limitations under the License.
*/
using System;
+using System.Collections;
+using System.Collections.Generic;
using System.IO;
using System.Text;
using Apache.NMS.Util;
@@ -22,28 +24,272 @@ using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
- public enum NMSMessageType
- {
- BaseMessage,
- TextMessage,
- BytesMessage,
- ObjectMessage,
- MapMessage,
- StreamMessage
- }
-
- public class DefaultMessageConverter : IMessageConverter
- {
- public virtual Message ToAmqpMessage(IMessage message)
- {
- Message amqpMessage = new Message();
- return amqpMessage;
- }
-
- public virtual IMessage ToNmsMessage(Message message)
- {
- BaseMessage answer = null; // CreateNmsMessage(message);
- return answer;
- }
- }
+ public enum NMSMessageType
+ {
+ BaseMessage,
+ TextMessage,
+ BytesMessage,
+ ObjectMessage,
+ MapMessage,
+ StreamMessage
+ }
+
+ public class DefaultMessageConverter : IMessageConverter
+ {
+ #region IMessageConverter Members
+ // NMS Message AMQP Message
+ // ================================ =================
+ // string NMSCorrelationID string CorrelationId
+ // MsgDeliveryMode NMSDeliveryMode bool Durable
+ // IDestination NMSDestination
+ // string MNSMessageId string MessageId
+ // MsgPriority NMSPriority byte Priority
+ // bool NMSRedelivered bool Redelivered
+ // IDestination NMSReplyTo Address ReplyTo
+ // DateTime NMSTimestamp
+ // TimeSpan NMSTimeToLive Duration Ttl
+ // string NMSType string ContentType
+ // IPrimitiveMap Properties Dictionary Properties
+ // string Subject
+ // string UserId
+ //
+ public virtual Message ToAmqpMessage(IMessage message)
+ {
+ Message amqpMessage = CreateAmqpMessage(message);
+
+ if (null != message.NMSCorrelationID)
+ {
+ amqpMessage.CorrelationId = message.NMSCorrelationID;
+ }
+ amqpMessage.Durable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
+ if (null != message.NMSMessageId)
+ {
+ amqpMessage.MessageId = message.NMSMessageId;
+ }
+ amqpMessage.Priority = ToAmqpMessagePriority(message.NMSPriority);
+ amqpMessage.Redelivered = message.NMSRedelivered;
+ if (null != message.NMSReplyTo)
+ {
+ amqpMessage.ReplyTo = ToAmqpAddress(message.NMSReplyTo);
+ }
+
+ if (message.NMSTimeToLive != TimeSpan.Zero)
+ {
+ amqpMessage.Ttl = ToQpidDuration(message.NMSTimeToLive);
+ }
+
+ if (null != message.NMSType)
+ {
+ amqpMessage.ContentType = message.NMSType;
+ }
+
+ amqpMessage.Properties = FromNmsPrimitiveMap(message.Properties);
+
+ // TODO: NMSDestination, Amqp.Subect, Amqp.UserId
+ return amqpMessage;
+ }
+
+ //
+ public virtual IMessage ToNmsMessage(Message message)
+ {
+ BaseMessage answer = CreateNmsMessage(message);
+
+ try
+ {
+ answer.NMSCorrelationID = message.CorrelationId;
+ answer.NMSDeliveryMode = (message.Durable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
+ answer.NMSMessageId = message.Subject;
+ answer.NMSPriority = ToNmsPriority(message.Priority);
+ answer.NMSRedelivered = message.Redelivered;
+ answer.NMSReplyTo = ToNmsDestination(message.ReplyTo);
+ answer.NMSTimeToLive = ToNMSTimespan(message.Ttl);
+ answer.NMSType = message.ContentType;
+ SetNmsPrimitiveMap(answer.Properties, message.Properties);
+
+ // TODO: NMSDestination, NMSTimestamp, Properties
+ }
+ catch (InvalidOperationException)
+ {
+ }
+
+ return answer;
+ }
+ #endregion
+
+ #region MessagePriority Methods
+ //
+ private static byte ToAmqpMessagePriority(MsgPriority msgPriority)
+ {
+ return (byte)msgPriority;
+ }
+
+ //
+ private static MsgPriority ToNmsPriority(byte qpidMsgPriority)
+ {
+ if (qpidMsgPriority > (byte)MsgPriority.Highest)
+ {
+ return MsgPriority.Highest;
+ }
+ return (MsgPriority)qpidMsgPriority;
+ }
+ #endregion
+
+ #region Duration Methods
+ //
+ private static Duration ToQpidDuration(TimeSpan timespan)
+ {
+ if (timespan.TotalMilliseconds <= 0)
+ {
+ Duration result = DurationConstants.IMMEDIATE;
+ return result;
+ }
+ else if (timespan.TotalMilliseconds > (Double)DurationConstants.FORVER.Milliseconds)
+ {
+ Duration result = DurationConstants.FORVER;
+ return result;
+ }
+ else
+ {
+ Duration result = new Duration((UInt64)timespan.TotalMilliseconds);
+ return result;
+ }
+ }
+
+ //
+ private static TimeSpan ToNMSTimespan(Duration duration)
+ {
+ if (duration.Milliseconds > Int64.MaxValue)
+ {
+ TimeSpan result = new TimeSpan(Int64.MaxValue);
+ return result;
+ }
+ else
+ {
+ TimeSpan result = new TimeSpan((Int64)duration.Milliseconds);
+ return result;
+ }
+ }
+ #endregion
+
+ #region MessageBody Conversion Methods
+ protected virtual Message CreateAmqpMessage(IMessage message)
+ {
+ if (message is TextMessage)
+ {
+ TextMessage textMessage = message as TextMessage;
+ Message result = new Message(textMessage.Text);
+ return result;
+ }
+ else if (message is BytesMessage)
+ {
+ BytesMessage bytesMessage = message as BytesMessage;
+ Message result = new Message(bytesMessage.Content, 0, bytesMessage.Content.Length);
+ return result;
+ }
+ else if (message is ObjectMessage)
+ {
+ ObjectMessage objectMessage = message as ObjectMessage;
+ Message result = new Message(objectMessage.Body);
+ return result;
+ }
+ else if (message is MapMessage)
+ {
+ MapMessage mapMessage = message as MapMessage;
+ PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
+ byte[] buf = mapBody.Marshal();
+ Message result = new Message(buf, 0, buf.Length);
+ return result;
+ }
+ else if (message is StreamMessage)
+ {
+ StreamMessage streamMessage = message as StreamMessage;
+ Message result = new Message(streamMessage.Content, 0, streamMessage.Content.Length);
+ return result;
+ }
+ else if (message is BaseMessage)
+ {
+ Message result = new Message();
+ return result;
+ }
+ else
+ {
+ throw new Exception("unhandled message type");
+ }
+ }
+
+ protected virtual BaseMessage CreateNmsMessage(Message message)
+ {
+ BaseMessage result = null;
+
+ if ("amqp/map" == message.ContentType)
+ {
+ // TODO: Return map message
+ }
+ else if ("amqp/list" == message.ContentType)
+ {
+ // TODO: Return list message
+ }
+ else
+ {
+ TextMessage textMessage = new TextMessage();
+ textMessage.Text = message.GetContent();
+ result = textMessage;
+ }
+
+ return result;
+ }
+ #endregion
+
+ #region Address/Destination Conversion Methods
+ public Address ToAmqpAddress(IDestination destination)
+ {
+ if (null == destination)
+ {
+ return null;
+ }
+
+ return new Address((destination as Destination).Path);
+ }
+
+ protected virtual IDestination ToNmsDestination(Address destinationQueue)
+ {
+ if (null == destinationQueue)
+ {
+ return null;
+ }
+
+ return new Queue(destinationQueue.ToString());
+ }
+ #endregion
+
+ #region PrimitiveMap Conversion Methods
+
+ //
+ public void SetNmsPrimitiveMap(IPrimitiveMap map, Dictionary<string, object> dict)
+ {
+
+ // TODO: lock?
+ map.Clear();
+ foreach (System.Collections.Generic.KeyValuePair
+ <string, object> kvp in dict)
+ {
+ map[kvp.Key] = kvp.Value;
+ }
+ }
+
+ //
+ public Dictionary<string, object> FromNmsPrimitiveMap(IPrimitiveMap pm)
+ {
+ Dictionary<string, object> dict = new Dictionary<string,object>();
+
+ // TODO: lock?
+ ICollection keys = pm.Keys;
+ foreach (object key in keys)
+ {
+ dict.Add(key.ToString(), pm[key.ToString()]);
+ }
+ return dict;
+ }
+ #endregion
+ }
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageConsumer.cs Mon Jan 6 22:23:35 2014
@@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp
try
{
// Create qpid sender
- Console.WriteLine("Start Consumer Id = " + ConsumerId.ToString());
+ Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString());
if (qpidReceiver == null)
{
qpidReceiver = session.CreateQpidReceiver(destination.ToString());
@@ -141,8 +141,11 @@ namespace Apache.NMS.Amqp
{
IMessage nmsMessage = null;
- // TODO: Receive a message
-
+ Message qpidMessage = new Message();
+ if (qpidReceiver.Fetch(ref qpidMessage))
+ {
+ nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage);
+ }
return nmsMessage;
}
Modified: activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs?rev=1556053&r1=1556052&r2=1556053&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.AMQP/trunk/src/main/csharp/MessageProducer.cs Mon Jan 6 22:23:35 2014
@@ -76,7 +76,7 @@ namespace Apache.NMS.Amqp
try
{
// Create qpid sender
- Console.WriteLine("Start Producer Id = " + ProducerId.ToString());
+ Tracer.DebugFormat("Start Producer Id = " + ProducerId.ToString());
if (qpidSender == null)
{
qpidSender = session.CreateQpidSender(destination.ToString());
@@ -102,7 +102,7 @@ namespace Apache.NMS.Amqp
{
try
{
- Console.WriteLine("Stop Producer Id = " + ProducerId);
+ Tracer.DebugFormat("Stop Producer Id = " + ProducerId);
qpidSender.Dispose();
qpidSender = null;
}
@@ -158,7 +158,7 @@ namespace Apache.NMS.Amqp
// Convert the Message into a Amqp message
Message msg = session.MessageConverter.ToAmqpMessage(message);
- // TODO: send the message!
+ qpidSender.Send(msg);
}
catch (Exception e)
{