You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2014/10/08 02:59:47 UTC

svn commit: r1629998 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk: ./ src/main/csharp/

Author: jgomes
Date: Wed Oct  8 00:59:46 2014
New Revision: 1629998

URL: http://svn.apache.org/r1629998
Log:
Support serializing/deserializing NMS properties on the wire.
Fix several resource leaks that were causing lock-ups when disposing of connection objects. Many more unit tests can now be run. Some unit tests will never succeed due to the nature of the ZMQ implementation not having a central broker.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)

Added:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config Wed Oct  8 00:59:46 2014
@@ -16,7 +16,7 @@
 * limitations under the License.
 -->
 <configuration>
-	<defaultURI value="zmq://localhost">
+	<defaultURI value="zmq:tcp://localhost:5556">
 		<factoryParams>
 			<param type="string" value="NMSTestClient"/>
 		</factoryParams>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs Wed Oct  8 00:59:46 2014
@@ -73,6 +73,16 @@ namespace Apache.NMS.ZMQ
 			{
 				if(0 == --instanceCount)
 				{
+					lock(producerCacheLock)
+					{
+						foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)
+						{
+							cacheItem.Value.producer.Unbind(cacheItem.Key);
+						}
+
+						producerCache.Clear();
+					}
+
 					Connection._context.Dispose();
 				}
 			}
@@ -248,16 +258,6 @@ namespace Apache.NMS.ZMQ
         public void Close()
         {
             Stop();
-
-			lock(producerCacheLock)
-			{
-				foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)
-				{
-					cacheItem.Value.producer.Unbind(cacheItem.Key);
-				}
-
-				producerCache.Clear();
-			}
 		}
 
         public void PurgeTempDestinations()

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Wed Oct  8 00:59:46 2014
@@ -16,9 +16,9 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Text;
 using ZeroMQ;
-using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -47,6 +47,7 @@ namespace Apache.NMS.ZMQ
 		{
 			this.session = session;
 			this.destinationName = destName;
+			this.session.RegisterDestination(this);
 		}
 
 		~Destination()
@@ -103,6 +104,8 @@ namespace Apache.NMS.ZMQ
 				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
 				this.consumerEndpoint = null;
 			}
+
+			this.session.UnregisterDestination(this);
 		}
 
 		public string Name

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Oct  8 00:59:46 2014
@@ -19,6 +19,8 @@ using System;
 using System.Diagnostics;
 using System.Text;
 using System.Threading;
+using System.Net;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ZMQ
 {
@@ -121,10 +123,15 @@ namespace Apache.NMS.ZMQ
 			if(size > 0)
 			{
 				// Strip off the subscribed destination name.
-				// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
-				int msgStart = this.rawDestinationName.Length;
-				int msgLength = receivedMsg.Length - msgStart;
-				string msgContent = Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+				int receivedMsgIndex = this.rawDestinationName.Length;
+				int msgLength = receivedMsg.Length - receivedMsgIndex;
+				byte[] msgContent = new byte[msgLength];
+
+				for(int index = 0; index < msgLength; index++, receivedMsgIndex++)
+				{
+					msgContent[index] = receivedMsg[receivedMsgIndex];
+				}
+
 				return ToNmsMessage(msgContent);
 			}
 
@@ -203,6 +210,7 @@ namespace Apache.NMS.ZMQ
 			Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
 			TimeSpan receiveWait = TimeSpan.FromSeconds(2);
 
+			this.destination.InitReceiver();
 			// Signal that this thread has started.
 			asyncInit = true;
 
@@ -227,7 +235,7 @@ namespace Apache.NMS.ZMQ
 						}
 						else
 						{
-							Thread.Sleep(0);
+							Thread.Sleep(1);
 						}
 					}
 				}
@@ -258,37 +266,219 @@ namespace Apache.NMS.ZMQ
 		/// <returns>
 		/// nms message object
 		/// </returns>
-		protected virtual IMessage ToNmsMessage(string messageText)
+		protected virtual IMessage ToNmsMessage(byte[] msgData)
 		{
-			// Strip off the destination name prefix.
-			IMessage nmsMessage = new TextMessage(messageText);
+			IMessage nmsMessage = null;
+			int messageType = WireFormat.MT_UNKNOWN;
+			int fieldType = WireFormat.MFT_NONE;
+			DateTime messageTimestamp = DateTime.UtcNow;
+			string messageNMSType = null;
+			string messageCorrelationId = null;
+			IDestination messageReplyTo = null;
+			MsgDeliveryMode messageDeliveryMode = MsgDeliveryMode.NonPersistent;
+			MsgPriority messagePriority = MsgPriority.Normal;
+			TimeSpan messageTimeToLive = TimeSpan.FromTicks(0);
+			IPrimitiveMap messageProperties = null;
+			int fieldLen;
+			int index = 0;
+			string messageID = string.Empty;
+			byte[] messageBody = null;
 
 			try
 			{
-				nmsMessage.NMSMessageId = "";
-				nmsMessage.NMSDestination = this.destination;
-				nmsMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent;
-				nmsMessage.NMSPriority = MsgPriority.Normal;
-				nmsMessage.NMSTimestamp = DateTime.Now;
-				nmsMessage.NMSTimeToLive = new TimeSpan(0);
-				nmsMessage.NMSType = "";
+				// Parse the commond message fields
+				do
+				{
+					fieldType = ReadInt(msgData, ref index);
+					switch(fieldType)
+					{
+					case WireFormat.MFT_NONE:
+						break;
+
+					case WireFormat.MFT_MESSAGEID:
+						messageID = ReadString(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_TIMESTAMP:
+						fieldLen = ReadInt(msgData, ref index);
+						Debug.Assert(sizeof(long) == fieldLen);
+						messageTimestamp = DateTime.FromBinary(ReadLong(msgData, ref index));
+						break;
+
+					case WireFormat.MFT_NMSTYPE:
+						messageNMSType = ReadString(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_CORRELATIONID:
+						messageCorrelationId = ReadString(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_REPLYTO:
+						string replyToDestName = ReadString(msgData, ref index);
+						messageReplyTo = this.session.GetDestination(replyToDestName);
+						break;
+
+					case WireFormat.MFT_DELIVERYMODE:
+						fieldLen = ReadInt(msgData, ref index);
+						Debug.Assert(sizeof(int) == fieldLen);
+						messageDeliveryMode = (MsgDeliveryMode) ReadInt(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_PRIORITY:
+						fieldLen = ReadInt(msgData, ref index);
+						Debug.Assert(sizeof(int) == fieldLen);
+						messagePriority = (MsgPriority) ReadInt(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_TIMETOLIVE:
+						fieldLen = ReadInt(msgData, ref index);
+						Debug.Assert(sizeof(long) == fieldLen);
+						messageTimeToLive = TimeSpan.FromTicks(ReadLong(msgData, ref index));
+						break;
+
+					case WireFormat.MFT_HEADERS:
+						fieldLen = ReadInt(msgData, ref index);
+						int numProperties = ReadInt(msgData, ref index);
+						if(numProperties > 0)
+						{
+							messageProperties = new PrimitiveMap();
+							while(numProperties-- > 0)
+							{
+								string propertyKey = ReadString(msgData, ref index);
+								byte[] propertyVal = ReadBytes(msgData, ref index);
+								messageProperties.SetBytes(propertyKey, propertyVal);
+							}
+						}
+						break;
+
+					case WireFormat.MFT_MSGTYPE:
+						fieldLen = ReadInt(msgData, ref index);
+						Debug.Assert(sizeof(int) == fieldLen);
+						messageType = ReadInt(msgData, ref index);
+						break;
+
+					case WireFormat.MFT_BODY:
+						messageBody = ReadBytes(msgData, ref index);
+						break;
+
+					default:
+						// Skip past this field.
+						Tracer.WarnFormat("Unknown message field type: {0}", fieldType);
+						fieldLen = ReadInt(msgData, ref index);
+						index += fieldLen;
+						break;
+					}
+				} while(WireFormat.MFT_NONE != fieldType && index < msgData.Length);
 			}
-			catch(InvalidOperationException)
+			catch(Exception ex)
 			{
-				// Log error
+				Tracer.ErrorFormat("Exception parsing message: {0}", ex.Message);
 			}
 
-			if(null != this.ConsumerTransformer)
+			// Instantiate the message type
+			switch(messageType)
 			{
-				IMessage transformedMessage = ConsumerTransformer(this.session, this, nmsMessage);
+			case WireFormat.MT_MESSAGE:
+				nmsMessage = new BaseMessage();
+				break;
+
+			case WireFormat.MT_TEXTMESSAGE:
+				nmsMessage = new TextMessage();
+				if(null != messageBody)
+				{
+					((TextMessage) nmsMessage).Text = Encoding.UTF8.GetString(messageBody);
+				}
+				break;
+
+			case WireFormat.MT_UNKNOWN:
+			default:
+				break;
+			}
 
-				if(null != transformedMessage)
+			// Set the common headers.
+			if(null != nmsMessage)
+			{
+				try
 				{
-					nmsMessage = transformedMessage;
+					nmsMessage.NMSMessageId = messageID;
+					nmsMessage.NMSCorrelationID = messageCorrelationId;
+					nmsMessage.NMSDestination = this.destination;
+					nmsMessage.NMSReplyTo = messageReplyTo;
+					nmsMessage.NMSDeliveryMode = messageDeliveryMode;
+					nmsMessage.NMSPriority = messagePriority;
+					nmsMessage.NMSTimestamp = messageTimestamp;
+					nmsMessage.NMSTimeToLive = messageTimeToLive;
+					nmsMessage.NMSType = messageNMSType;
+					if(null != messageProperties)
+					{
+						foreach(string propertyKey in messageProperties.Keys)
+						{
+							nmsMessage.Properties.SetBytes(propertyKey, messageProperties.GetBytes(propertyKey));
+						}
+					}
+				}
+				catch(InvalidOperationException)
+				{
+					// Log error
+				}
+
+				if(null != this.ConsumerTransformer)
+				{
+					IMessage transformedMessage = ConsumerTransformer(this.session, this, nmsMessage);
+
+					if(null != transformedMessage)
+					{
+						nmsMessage = transformedMessage;
+					}
 				}
 			}
 
 			return nmsMessage;
 		}
+
+		private long ReadLong(byte[] msgData, ref int index)
+		{
+			long val = BitConverter.ToInt64(msgData, index);
+			index += sizeof(long);
+			return IPAddress.NetworkToHostOrder(val);
+		}
+
+		private int ReadInt(byte[] msgData, ref int index)
+		{
+			int val = BitConverter.ToInt32(msgData, index);
+			index += sizeof(int);
+			return IPAddress.NetworkToHostOrder(val);
+		}
+
+		private string ReadString(byte[] msgData, ref int index)
+		{
+			int stringLen = ReadInt(msgData, ref index);
+			string stringVal = string.Empty;
+
+			if(stringLen > 0)
+			{
+				stringVal = Encoding.UTF8.GetString(msgData, index, stringLen);
+				index += stringLen;
+			}
+
+			return stringVal;
+		}
+
+		private byte[] ReadBytes(byte[] msgData, ref int index)
+		{
+			int bytesLen = ReadInt(msgData, ref index);
+			byte[] bytesVal = null;
+
+			if(bytesLen >= 0)
+			{
+				bytesVal = new byte[bytesLen];
+				for(int byteIndex = 0; byteIndex < bytesLen; byteIndex++, index++)
+				{
+					bytesVal[byteIndex] = msgData[index];
+				}
+			}
+
+			return bytesVal;
+		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Wed Oct  8 00:59:46 2014
@@ -18,9 +18,9 @@
 #define PUBSUB
 
 using System;
+using System.Collections.Generic;
 using System.Text;
-using ZeroMQ;
-
+using System.Net;
 
 namespace Apache.NMS.ZMQ
 {
@@ -59,24 +59,30 @@ namespace Apache.NMS.ZMQ
 
 		public void Send(IMessage message)
 		{
-			Send(this.destination, message);
+			Send(this.destination, message, this.deliveryMode, this.priority, this.timeToLive, false);
 		}
 
-		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		public void Send(IDestination dest, IMessage message)
 		{
-			Send(this.destination, message, deliveryMode, priority, timeToLive);
+			Send(dest, message, this.deliveryMode, this.priority, this.timeToLive, false);
 		}
 
-		public void Send(IDestination dest, IMessage message)
+		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
-			Send(dest, message, this.DeliveryMode, this.Priority, this.TimeToLive);
+			Send(this.destination, message, deliveryMode, priority, timeToLive, true);
 		}
 
 		public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
-			// UNUSED_PARAM(deliveryMode);	// No concept of different delivery modes in ZMQ
-			// UNUSED_PARAM(priority);		// No concept of priority messages in ZMQ
-			// UNUSED_PARAM(timeToLive);	// No concept of time-to-live in ZMQ
+			Send(destination, message, deliveryMode, priority, timeToLive, true);
+		}
+
+		public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+		{
+			if(null == dest)
+			{
+				return;
+			}
 
 			if(null != this.ProducerTransformer)
 			{
@@ -88,14 +94,145 @@ namespace Apache.NMS.ZMQ
 				}
 			}
 
-			// TODO: Support encoding of all message types + all meta data (e.g., headers and properties)
+			// Serialize the message data
+			Destination theDest = (Destination) dest;
+			List<byte> msgDataBuilder = new List<byte>();
+
+			// Always set the message Id.
+			message.NMSMessageId = Guid.NewGuid().ToString();
+			message.NMSTimestamp = DateTime.UtcNow;
+			if(specifiedTimeToLive)
+			{
+				message.NMSTimeToLive = timeToLive;
+			}
 
 			// Prefix the message with the destination name. The client will subscribe to this destination name
 			// in order to receive messages.
-			Destination theDest = (Destination) dest;
+			msgDataBuilder.AddRange(Encoding.UTF8.GetBytes(theDest.Name));
+
+			// Encode all meta data (e.g., headers and properties)
+			EncodeField(msgDataBuilder, WireFormat.MFT_MESSAGEID, message.NMSMessageId);
+			EncodeField(msgDataBuilder, WireFormat.MFT_TIMESTAMP, message.NMSTimestamp.ToBinary());
+			if(null != message.NMSType)
+			{
+				EncodeField(msgDataBuilder, WireFormat.MFT_NMSTYPE, message.NMSType);
+			}
+
+			if(null != message.NMSCorrelationID)
+			{
+				EncodeField(msgDataBuilder, WireFormat.MFT_CORRELATIONID, message.NMSCorrelationID);
+			}
+
+			if(null != message.NMSReplyTo)
+			{
+				EncodeField(msgDataBuilder, WireFormat.MFT_REPLYTO, ((Destination) message.NMSReplyTo).Name);
+			}
+
+			EncodeField(msgDataBuilder, WireFormat.MFT_DELIVERYMODE, message.NMSDeliveryMode);
+			EncodeField(msgDataBuilder, WireFormat.MFT_PRIORITY, message.NMSPriority);
+			EncodeField(msgDataBuilder, WireFormat.MFT_TIMETOLIVE, message.NMSTimeToLive.Ticks);
+
+			IPrimitiveMap properties = message.Properties;
+			if(null != properties && properties.Count > 0)
+			{
+				// Encode into a temporary buffer, and then place a single buffer into the msgDataBuilder.
+				List<byte> propertiesBuilder = new List<byte>();
+
+				EncodeFieldData(propertiesBuilder, propertiesBuilder.Count);
+				foreach(string propertyKey in properties.Keys)
+				{
+					EncodeFieldData(propertiesBuilder, propertyKey);
+					EncodeFieldData(propertiesBuilder, properties.GetBytes(propertyKey));
+				}
+
+				EncodeField(msgDataBuilder, WireFormat.MFT_HEADERS, propertiesBuilder.ToArray());
+			}
+
+			if(message is ITextMessage)
+			{
+				EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_TEXTMESSAGE);
+				// Append the message text body to the msg.
+				string msgBody = ((ITextMessage) message).Text;
+
+				if(null != msgBody)
+				{
+					EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
+				}
+			}
+			else
+			{
+				// TODO: Add support for more message types
+				EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_MESSAGE);
+			}
+
+			// Put the sentinal field marker.
+			EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+			theDest.Send(msgDataBuilder.ToArray());
+		}
+
+		private void EncodeField(List<byte> msgDataBuilder, int msgFieldType, string fieldData)
+		{
+			if(null == fieldData)
+			{
+				fieldData = string.Empty;
+			}
+
+			EncodeField(msgDataBuilder, msgFieldType, Encoding.UTF8.GetBytes(fieldData));
+		}
+
+		private void EncodeField(List<byte> msgDataBuilder, int msgFieldType, Enum fieldData)
+		{
+			EncodeField(msgDataBuilder, msgFieldType, Convert.ToInt32(fieldData));
+		}
+
+		private void EncodeField(List<byte> msgDataBuilder, int msgFieldType, int fieldData)
+		{
+			EncodeField(msgDataBuilder, msgFieldType, BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+		}
+
+		private void EncodeField(List<byte> msgDataBuilder, int msgFieldType, long fieldData)
+		{
+			EncodeField(msgDataBuilder, msgFieldType, BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+		}
 
-			string msg = theDest.Name + ((ITextMessage) message).Text;
-			theDest.Send(msg);
+		private void EncodeField(List<byte> msgDataBuilder, int msgFieldType, byte[] fieldData)
+		{
+			// Encode the field type
+			msgDataBuilder.AddRange(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(msgFieldType)));
+			EncodeFieldData(msgDataBuilder, fieldData);
+		}
+
+		private void EncodeFieldData(List<byte> msgDataBuilder, int fieldData)
+		{
+			msgDataBuilder.AddRange(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+		}
+
+		private void EncodeFieldData(List<byte> msgDataBuilder, string fieldData)
+		{
+			if(null == fieldData)
+			{
+				fieldData = string.Empty;
+			}
+
+			EncodeFieldData(msgDataBuilder, Encoding.UTF8.GetBytes(fieldData));
+		}
+
+		private void EncodeFieldData(List<byte> msgDataBuilder, byte[] fieldData)
+		{
+			// Encode the field length
+			int fieldLength = 0;
+
+			if(null != fieldData)
+			{
+				fieldLength = fieldData.Length;
+			}
+
+			EncodeFieldData(msgDataBuilder, fieldLength);
+			if(0 != fieldLength)
+			{
+				// Encode the field data
+				msgDataBuilder.AddRange(fieldData);
+			}
 		}
 
 		public void Dispose()

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs Wed Oct  8 00:59:46 2014
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Messaging;
 
 namespace Apache.NMS.ZMQ
@@ -28,6 +29,8 @@ namespace Apache.NMS.ZMQ
         private Connection connection;
         private AcknowledgementMode acknowledgementMode;
         private MessageQueueTransaction messageQueueTransaction;
+		private List<Destination> destinations = new List<Destination>();
+		private object destinationLock = new object();
 
         public Session(Connection connection, AcknowledgementMode acknowledgementMode)
         {
@@ -46,13 +49,49 @@ namespace Apache.NMS.ZMQ
 
         public void Close()
         {
-            if(MessageQueueTransaction != null)
+			List<Destination> closingDestinations = null;
+
+			lock(destinationLock)
+			{
+				if(destinations.Count > 0)
+				{
+					closingDestinations = new List<Destination>(destinations);
+				}
+
+				destinations.Clear();
+			}
+
+			if(null != closingDestinations)
+			{
+				foreach(Destination dest in closingDestinations)
+				{
+					dest.Dispose();
+				}
+			}
+			
+			if(MessageQueueTransaction != null)
             {
                 MessageQueueTransaction.Dispose();
                 MessageQueueTransaction = null;
             }
         }
 
+		internal void RegisterDestination(Destination dest)
+		{
+			lock(destinationLock)
+			{
+				destinations.Add(dest);
+			}
+		}
+
+		internal void UnregisterDestination(Destination dest)
+		{
+			lock(destinationLock)
+			{
+				destinations.Remove(dest);
+			}
+		}
+
         #region Producer methods
         public IMessageProducer CreateProducer()
         {

Added: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs?rev=1629998&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs Wed Oct  8 00:59:46 2014
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ZMQ
+{
+	public class WireFormat
+	{
+		/// <summary>
+		/// Message Field Types
+		/// IMPORTANT: These assigned numbers cannot change. If new field types
+		/// are added, then they must be assigned new numbers. Do not re-use numbers.
+		/// </summary>
+		public const int MFT_NONE = 0;
+		public const int MFT_MESSAGEID = 1;
+		public const int MFT_TIMESTAMP = 2;
+		public const int MFT_NMSTYPE = 3;
+		public const int MFT_CORRELATIONID = 4;
+		public const int MFT_REPLYTO = 5;
+		public const int MFT_DELIVERYMODE = 6;
+		public const int MFT_PRIORITY = 7;
+		public const int MFT_TIMETOLIVE = 8;
+		public const int MFT_HEADERS = 9;
+		public const int MFT_MSGTYPE = 10;
+		public const int MFT_BODY = 11;
+
+		// Message Types
+		/// <summary>
+		/// Message Types
+		/// These are the base message types. This is a sub-field of the MFT_MSGTYPE message field type.
+		/// IMPORTANT: These numbers cannot be changed. It will break wireformat compatibility.
+		/// </summary>
+		public const int MT_UNKNOWN = 0;
+		public const int MT_MESSAGE = 1;
+		public const int MT_TEXTMESSAGE = 2;
+		public const int MT_BYTESMESSAGE = 3;
+		public const int MT_MAPMESSAGE = 4;
+		public const int MT_OBJECTMESSAGE = 5;
+		public const int MT_STREAMMESSAGE = 6;
+
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj Wed Oct  8 00:59:46 2014
@@ -57,6 +57,9 @@
     <None Include="Apache.NMS.ZMQ.Test.nunit">
       <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
     </None>
+    <Content Include="nmsprovider-test.config">
+      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+    </Content>
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="vs2010-zmq-net-4.0.csproj">

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj Wed Oct  8 00:59:46 2014
@@ -74,6 +74,7 @@
     </Compile>
     <Compile Include="src\main\csharp\TextMessage.cs" />
     <Compile Include="src\main\csharp\Utils.cs" />
+    <Compile Include="src\main\csharp\WireFormat.cs" />
   </ItemGroup>
   <ItemGroup>
     <Content Include="lib\clrzmq\net-4.0\libzmq.dll">