You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2014/10/09 03:38:53 UTC

svn commit: r1630267 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp: BaseMessage.cs Destination.cs MessageConsumer.cs MessageProducer.cs TemporaryQueue.cs TemporaryTopic.cs

Author: jgomes
Date: Thu Oct  9 01:38:53 2014
New Revision: 1630267

URL: http://svn.apache.org/r1630267
Log:
Add support for serializing/deserializing BytesMessages.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs Thu Oct  9 01:38:53 2014
@@ -36,12 +36,19 @@ namespace Apache.NMS.ZMQ
 		private string type;
 		private event AcknowledgeHandler Acknowledger;
 		private DateTime timestamp = new DateTime();
+		private bool readOnlyMsgProperties = false;
 		private bool readOnlyMsgBody = false;
 
-		public bool ReadOnlyBody
+		public virtual bool ReadOnlyProperties
 		{
-			get { return readOnlyMsgBody; }
-			set { readOnlyMsgBody = value; }
+			get { return this.readOnlyMsgProperties; }
+			set { this.readOnlyMsgProperties = value; }
+		}
+
+		public virtual bool ReadOnlyBody
+		{
+			get { return this.readOnlyMsgBody; }
+			set { this.readOnlyMsgBody = value; }
 		}
 
 		// IMessage interface
@@ -155,7 +162,6 @@ namespace Apache.NMS.ZMQ
 			set { }
 		}
 
-
 		/// <summary>
 		/// The destination that the consumer of this message should send replies to
 		/// </summary>
@@ -190,7 +196,6 @@ namespace Apache.NMS.ZMQ
 			set { type = value; }
 		}
 
-
 		public object GetObjectProperty(string name)
 		{
 			return null;
@@ -200,9 +205,15 @@ namespace Apache.NMS.ZMQ
 		{
 		}
 
+		public virtual void OnSend()
+		{
+			this.ReadOnlyProperties = true;
+			this.ReadOnlyBody = true;
+		}
+
 		protected void FailIfReadOnlyBody()
 		{
-			if(ReadOnlyBody == true)
+			if(ReadOnlyBody)
 			{
 				throw new MessageNotWriteableException("Message is in Read-Only mode.");
 			}
@@ -210,7 +221,7 @@ namespace Apache.NMS.ZMQ
 
 		protected void FailIfWriteOnlyBody()
 		{
-			if(ReadOnlyBody == false)
+			if(!ReadOnlyBody)
 			{
 				throw new MessageNotReadableException("Message is in Write-Only mode.");
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Thu Oct  9 01:38:53 2014
@@ -36,6 +36,7 @@ namespace Apache.NMS.ZMQ
 		protected ZmqSocket producerEndpoint = null;
 		protected ZmqSocket consumerEndpoint = null;
 		protected string destinationName;
+		internal byte[] rawDestinationName;
 
 		private bool disposed = false;
 
@@ -47,6 +48,7 @@ namespace Apache.NMS.ZMQ
 		{
 			this.session = session;
 			this.destinationName = destName;
+			this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName);
 			this.session.RegisterDestination(this);
 		}
 
@@ -88,23 +90,8 @@ namespace Apache.NMS.ZMQ
 		/// </summary>
 		protected virtual void OnDispose()
 		{
-			if(null != this.producerEndpoint)
-			{
-				if(null != this.session
-					&& null != this.session.Connection)
-				{
-					this.session.Connection.ReleaseProducer(this.producerEndpoint);
-				}
-
-				this.producerEndpoint = null;
-			}
-
-			if(null != this.consumerEndpoint)
-			{
-				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
-				this.consumerEndpoint = null;
-			}
-
+			DeinitSender();
+			DeinitReceiver();
 			this.session.UnregisterDestination(this);
 		}
 
@@ -190,6 +177,20 @@ namespace Apache.NMS.ZMQ
 			}
 		}
 
+		internal void DeinitSender()
+		{
+			if(null != this.producerEndpoint)
+			{
+				if(null != this.session
+					&& null != this.session.Connection)
+				{
+					this.session.Connection.ReleaseProducer(this.producerEndpoint);
+				}
+
+				this.producerEndpoint = null;
+			}
+		}
+
 		internal void InitReceiver()
 		{
 			if(null == this.consumerEndpoint)
@@ -198,34 +199,27 @@ namespace Apache.NMS.ZMQ
 
 				this.consumerEndpoint = connection.GetConsumer();
 				// Must subscribe first before connecting to the endpoint binding
-				this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+				this.consumerEndpoint.Subscribe(this.rawDestinationName);
 				this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
 			}
 		}
 
-		internal void Subscribe(string prefixName)
-		{
-			InitReceiver();
-			this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
-		}
-
-		internal void Unsubscribe(string prefixName)
+		internal void DeinitReceiver()
 		{
 			if(null != this.consumerEndpoint)
 			{
-				this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+				this.consumerEndpoint = null;
 			}
 		}
 
 		internal SendStatus Send(string msg)
 		{
-			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
 			return this.producerEndpoint.Send(msg, Destination.encoding);
 		}
 
 		internal SendStatus Send(byte[] buffer)
 		{
-			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
 			return this.producerEndpoint.Send(buffer);
 		}
 
@@ -246,20 +240,6 @@ namespace Apache.NMS.ZMQ
 			this.InitReceiver();
 			return this.consumerEndpoint.Receive(null, flags, out size);
 		}
-
-		internal Frame ReceiveFrame()
-		{
-			// TODO: Implement
-			this.InitReceiver();
-			return null;
-		}
-
-		internal ZmqMessage ReceiveMessage()
-		{
-			// TODO: Implement
-			this.InitReceiver();
-			return null;
-		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Oct  9 01:38:53 2014
@@ -40,7 +40,6 @@ namespace Apache.NMS.ZMQ
 		private object asyncDeliveryLock = new object();
 		private bool asyncDelivery = false;
 		private bool asyncInit = false;
-		private byte[] rawDestinationName;
 
 		private ConsumerTransformerDelegate consumerTransformer;
 		public ConsumerTransformerDelegate ConsumerTransformer
@@ -82,7 +81,6 @@ namespace Apache.NMS.ZMQ
 
 			this.session = sess;
 			this.destination = theDest;
-			this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
 			this.acknowledgementMode = ackMode;
 		}
 
@@ -145,7 +143,7 @@ namespace Apache.NMS.ZMQ
 			if(size > 0)
 			{
 				// Strip off the subscribed destination name.
-				int receivedMsgIndex = this.rawDestinationName.Length;
+				int receivedMsgIndex = this.destination.rawDestinationName.Length;
 				int msgLength = receivedMsg.Length - receivedMsgIndex;
 				byte[] msgContent = new byte[msgLength];
 
@@ -406,6 +404,14 @@ namespace Apache.NMS.ZMQ
 				}
 				break;
 
+			case WireFormat.MT_BYTESMESSAGE:
+				nmsMessage = new BytesMessage();
+				if(null != messageBody)
+				{
+					((BytesMessage) nmsMessage).Content = messageBody;
+				}
+				break;
+
 			case WireFormat.MT_UNKNOWN:
 			default:
 				break;
@@ -444,6 +450,9 @@ namespace Apache.NMS.ZMQ
 						nmsMessage = transformedMessage as BaseMessage;
 					}
 				}
+
+				nmsMessage.ReadOnlyBody = true;
+				nmsMessage.ReadOnlyProperties = true;
 			}
 
 			return nmsMessage;

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Thu Oct  9 01:38:53 2014
@@ -19,8 +19,8 @@
 
 using System;
 using System.Collections.Generic;
-using System.Text;
 using System.Net;
+using System.Text;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.ZMQ
@@ -48,13 +48,35 @@ namespace Apache.NMS.ZMQ
 
 		public MessageProducer(Session sess, IDestination dest)
 		{
-			if(null == sess.Connection.Context)
+			if(null == sess
+				|| null == sess.Connection
+				|| null == sess.Connection.Context)
 			{
 				throw new NMSConnectionException();
 			}
 
+			Destination theDest = dest as Destination;
+
+			if(null == theDest)
+			{
+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+			}
+			else if(null == theDest.Name)
+			{
+				throw new InvalidDestinationException("The destination object was not given a physical name.");
+			}
+			else if(theDest.IsTemporary)
+			{
+				String physicalName = theDest.Name;
+
+				if(String.IsNullOrEmpty(physicalName))
+				{
+					throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);
+				}
+			}
+
 			this.session = sess;
-			this.destination = (Destination) dest;
+			this.destination = theDest;
 			this.destination.InitSender();
 		}
 
@@ -150,6 +172,17 @@ namespace Apache.NMS.ZMQ
 					EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
 				}
 			}
+			else if(message is IBytesMessage)
+			{
+				EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);
+				// Append the message text body to the msg.
+				byte[] msgBody = ((IBytesMessage) message).Content;
+
+				if(null != msgBody)
+				{
+					EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
+				}
+			}
 			else
 			{
 				// TODO: Add support for more message types
@@ -158,6 +191,8 @@ namespace Apache.NMS.ZMQ
 
 			// Put the sentinal field marker.
 			EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+
+			((BaseMessage) message).OnSend();
 			theDest.Send(msgDataBuilder.ToArray());
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs Thu Oct  9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
 	public class TemporaryQueue : Destination, ITemporaryQueue
 	{
 		public TemporaryQueue(Session session)
-			: base(session, Guid.NewGuid().ToString())
+			: base(session, "TEMPQUEUE." + Guid.NewGuid().ToString())
 		{
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Thu Oct  9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
 	public class TemporaryTopic : Destination, ITemporaryTopic
 	{
 		public TemporaryTopic(Session session)
-			: base(session, Guid.NewGuid().ToString())
+			: base(session, "TEMPTOPIC." + Guid.NewGuid().ToString())
 		{
 		}