You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/11/05 23:33:49 UTC

svn commit: r833220 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ test/csharp/Commands/

Author: tabish
Date: Thu Nov  5 22:33:48 2009
New Revision: 833220

URL: http://svn.apache.org/viewvc?rev=833220&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-205

Add support for message compression.  Currently can only use the built in DeflateStream classes so compressed messages can only be read by two .NET client talking to each other via a Broker.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs Thu Nov  5 22:33:48 2009
@@ -20,6 +20,7 @@
 using System;
 using System.Collections;
 using System.IO;
+using System.IO.Compression;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
@@ -30,9 +31,7 @@
 		private EndianBinaryReader dataIn = null;
 		private EndianBinaryWriter dataOut = null;
 		private MemoryStream outputBuffer = null;
-
-		// Need this later when we add compression to store true content length.
-		private long length = 0;
+		private int length = 0;
 
 		public override byte GetDataStructureType()
 		{
@@ -450,6 +449,27 @@
 			}
 		}
 
+        public new byte[] Content
+        {
+            get
+            {
+				byte[] buffer = null;
+                InitializeReading();
+				if(this.length != 0)
+				{
+                	buffer = new byte[this.length];
+                	this.dataIn.Read(buffer, 0, buffer.Length);
+				}
+				return buffer;
+            }
+
+            set
+            {
+                InitializeWriting();
+                this.dataOut.Write(value, 0, value.Length);
+            }
+        }
+
 		public void Reset()
 		{
 			StoreContent();
@@ -464,14 +484,28 @@
 			FailIfWriteOnlyBody();
 			if(this.dataIn == null)
 			{
-				if(this.Content != null)
+                byte[] data = base.Content;
+                
+				if(base.Content == null)
 				{
-					this.length = this.Content.Length;
+					data = new byte[0];
 				}
-
-				// TODO - Add support for Message Compression.
-				MemoryStream bytesIn = new MemoryStream(this.Content, false);
-				dataIn = new EndianBinaryReader(bytesIn);
+                
+                Stream target = new MemoryStream(data, false);
+                
+                if(this.Connection != null && this.Compressed == true)
+                {
+                    EndianBinaryReader reader = new EndianBinaryReader(target);
+                    this.length = reader.ReadInt32();
+                    
+                    target = new DeflateStream(target, CompressionMode.Decompress);
+                }
+                else
+                {
+                    this.length = data.Length;
+                }
+                
+				this.dataIn = new EndianBinaryReader(target);
 			}
 		}
 
@@ -480,25 +514,138 @@
 			FailIfReadOnlyBody();
 			if(this.dataOut == null)
 			{
-				// TODO - Add support for Message Compression.
 				this.outputBuffer = new MemoryStream();
-				this.dataOut = new EndianBinaryWriter(outputBuffer);
+                Stream target = this.outputBuffer;
+
+                if(this.Connection != null && this.Connection.UseCompression)
+                {
+                    this.length = 0;
+					this.Compressed = true;
+
+                    target = new DeflateStream(target, CompressionMode.Compress);
+                    target = new LengthTrackerStream(target, this);
+                }
+                
+				this.dataOut = new EndianBinaryWriter(target);
 			}
 		}
 
 		private void StoreContent()
 		{
-			if( dataOut != null)
+			if(this.dataOut != null)
 			{
-				dataOut.Close();
-				// TODO - Add support for Message Compression.
+                if(this.Compressed == true)
+                {
+                    MemoryStream final = new MemoryStream();
+                    EndianBinaryWriter writer = new EndianBinaryWriter(final);                    
+
+                    this.dataOut.Close();
+                    byte[] compressed = this.outputBuffer.ToArray();
+
+                    writer.Write(this.length);
+                    writer.Write(compressed, 0, compressed.Length);
+                    writer.Close();
+                    
+                    base.Content = final.ToArray();
+                }
+                else
+                {
+                    this.dataOut.Close();
+                    base.Content = outputBuffer.ToArray();
+                }
 
-				this.Content = outputBuffer.ToArray();
 				this.dataOut = null;
 				this.outputBuffer = null;
 			}
 		}
 
+        /// <summary>
+        /// Used when the message compression is enabled to track how many bytes
+        /// the EndianBinaryWriter actually writes to the stream before compression
+        /// so that the receiving client can read off the real bodylength from the
+        /// Message before the data is actually read.
+        /// </summary>
+        private class LengthTrackerStream : Stream
+        {
+            private ActiveMQBytesMessage parent;
+            private Stream sink;
+
+            public LengthTrackerStream(Stream sink, ActiveMQBytesMessage parent) : base()
+            {
+                this.sink = sink;
+                this.parent = parent;
+            }
+
+            public override void Close()
+            {
+                this.sink.Close();
+                base.Close();
+            }
+
+            public override long Position
+            {
+                get { return this.sink.Position; }
+                set { this.sink.Position = value; }
+            }
+            
+            public override long Length
+            {
+                get { return this.sink.Length; }
+            }
+            
+            public override bool CanSeek
+            { 
+                get { return this.sink.CanSeek; } 
+            }
+            
+            public override bool CanRead 
+            { 
+                get { return this.sink.CanRead; } 
+            }
+
+            public override bool CanWrite
+            {
+                get { return this.sink.CanWrite; }
+            }
+
+            public override int ReadByte()
+            {
+                return this.sink.ReadByte();
+            }
+
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                return this.sink.Read(buffer, offset, count);
+            }
+            
+            public override void WriteByte(byte value)
+            {
+                this.parent.length++;
+                this.sink.WriteByte(value);
+            }
+            
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                this.parent.length += count;
+                this.sink.Write(buffer, offset, count);
+            }
+
+            public override void Flush()
+            {
+                this.sink.Flush();
+            }
+
+            public override long Seek(long offset, SeekOrigin origin)
+            {
+                return this.sink.Seek(offset, origin);
+            }
+
+            public override void SetLength(long value)
+            {
+                this.sink.SetLength(value);
+            }
+        }
+        
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs Thu Nov  5 22:33:48 2009
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
+using System;
+using System.IO;
+using System.IO.Compression;
 using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.OpenWire;
 
@@ -62,13 +65,29 @@
 		{
 			get
 			{
-				if(body == null)
-				{
-					body = PrimitiveMap.Unmarshal(Content);
-                    typeConverter = new PrimitiveMapInterceptor(this, body);
+				if(this.body == null)
+				{					
+					if(this.Content != null && this.Content.Length > 0)
+					{
+	                    MemoryStream buffer = new MemoryStream(this.Content);
+						Stream source = buffer;
+
+						if(this.Connection != null && this.Compressed)
+						{
+	                    	source = new DeflateStream(source, CompressionMode.Decompress);
+						}
+
+						this.body = PrimitiveMap.Unmarshal(source);
+					}
+					else
+					{
+						this.body = new PrimitiveMap();
+					}
+
+	                this.typeConverter = new PrimitiveMapInterceptor(this, this.body);
 				}
-				
-                return typeConverter;
+
+                return this.typeConverter;
 			}
 		}
 
@@ -80,7 +99,20 @@
 			}
 			else
 			{
-				Content = body.Marshal();
+				MemoryStream buffer = new MemoryStream();
+				Stream target = buffer;
+
+                if(this.Connection != null && this.Connection.UseCompression)
+                {
+                    target = new DeflateStream(target, CompressionMode.Compress);
+					
+					this.Compressed = true;
+                }
+                
+				this.body.Marshal(target);
+				target.Close();
+                
+				this.Content = buffer.ToArray();
 			}
 
 			Tracer.Debug("BeforeMarshalling, content is: " + Content);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMessage.cs Thu Nov  5 22:33:48 2009
@@ -30,6 +30,7 @@
 
 		private MessagePropertyIntercepter propertyHelper;
 		private PrimitiveMap properties;
+		private Connection connection;
 
 		public event AcknowledgeHandler Acknowledger;
 
@@ -38,11 +39,7 @@
 			return (ActiveMQMessage) message;
 		}
 
-		// TODO generate Equals method
-		// TODO generate GetHashCode method
-
-		public ActiveMQMessage()
-			: base()
+		public ActiveMQMessage() : base()
 		{
 			Timestamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
 		}
@@ -97,8 +94,8 @@
 
 		public virtual void ClearBody()
 		{
-			this.Content = null;
 			this.ReadOnlyBody = false;
+			this.Content = null;
 		}
 
 		public virtual void ClearProperties()
@@ -159,6 +156,12 @@
 			get { return Destination; }
 			set { this.Destination = ActiveMQDestination.Transform(value); }
 		}
+		
+		public Connection Connection
+		{
+			get { return this.connection; }
+			set { this.connection = value; }
+		}
 
 		/// <summary>
 		/// The correlation ID used to correlate messages with conversations or long running business processes

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs Thu Nov  5 22:33:48 2009
@@ -17,6 +17,7 @@
 
 using System;
 using System.IO;
+using System.IO.Compression;
 using System.Collections;
 using Apache.NMS;
 using Apache.NMS.Util;
@@ -882,9 +883,15 @@
 			FailIfWriteOnlyBody();
 			if(this.dataIn == null)
 			{
-				// TODO - Add support for Message Compression.
 				this.byteBuffer = new MemoryStream(this.Content, false);
-				dataIn = new EndianBinaryReader(byteBuffer);
+
+                Stream target = this.byteBuffer;
+                if(this.Connection != null && this.Compressed == true)
+                {
+                    target = new DeflateStream(this.byteBuffer, CompressionMode.Decompress);
+                }
+                
+				this.dataIn = new EndianBinaryReader(target);
 			}
 		}
 
@@ -893,9 +900,17 @@
 			FailIfReadOnlyBody();
 			if(this.dataOut == null)
 			{
-				// TODO - Add support for Message Compression.
-				this.byteBuffer = new MemoryStream();
-				this.dataOut = new EndianBinaryWriter(byteBuffer);
+                this.byteBuffer = new MemoryStream();
+
+                Stream target = this.byteBuffer;
+                if(this.Connection != null && this.Connection.UseCompression)
+                {
+                    target = new DeflateStream(this.byteBuffer, CompressionMode.Compress);
+					
+					this.Compressed = true;
+                }
+
+				this.dataOut = new EndianBinaryWriter(target);
 			}
 		}
 
@@ -904,7 +919,6 @@
 			if( dataOut != null)
 			{
 				dataOut.Close();
-				// TODO - Add support for Message Compression.
 
 				this.Content = byteBuffer.ToArray();
 				this.dataOut = null;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs Thu Nov  5 22:33:48 2009
@@ -17,6 +17,7 @@
 
 using System;
 using System.IO;
+using System.IO.Compression;
 using Apache.NMS;
 using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.OpenWire;
@@ -70,8 +71,13 @@
                 {
     				if(this.text == null && this.Content != null)
     				{
-                        // TODO - Handle Compression
-    					MemoryStream stream = new MemoryStream(this.Content);
+    					Stream stream = new MemoryStream(this.Content);
+
+                        if(this.Connection != null && this.Compressed == true)
+                        {
+                            stream = new DeflateStream(stream, CompressionMode.Decompress);
+                        }
+                        
     					EndianBinaryReader reader = new EndianBinaryReader(stream);
     					this.text = reader.ReadString32();
                         this.Content = null;
@@ -99,15 +105,23 @@
             if(this.Content == null && text != null)
             {
                 byte[] data = null;
-
-                // TODO - Deal with Compressoin.
                 
                 // Set initial size to the size of the string the UTF-8 encode could
                 // result in more if there are chars that encode to multibye values.
-                MemoryStream stream = new MemoryStream(text.Length);
-                EndianBinaryWriter writer = new EndianBinaryWriter(stream);
+                MemoryStream buffer = new MemoryStream(text.Length);
+                Stream target = buffer;
+				
+                if(this.Connection != null && this.Connection.UseCompression)
+                {
+                    target = new DeflateStream(target, CompressionMode.Compress);
+					
+					this.Compressed = true;
+                }
+                
+                EndianBinaryWriter writer = new EndianBinaryWriter(target);
                 writer.WriteString32(text);
-                data = stream.GetBuffer();
+                target.Close();
+                data = buffer.ToArray();
                 
                 this.Content = data;
                 this.text = null;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Thu Nov  5 22:33:48 2009
@@ -44,6 +44,7 @@
         private bool asyncSend = false;
         private bool alwaysSyncSend = false;
         private bool asyncClose = true;
+        private bool useCompression = false;
         private bool copyMessageOnSend = true;
         private int producerWindowSize = 0;
         private bool connected = false;
@@ -182,6 +183,19 @@
             set { copyMessageOnSend = value; }
         }
 
+        /// <summary>
+        /// Enable or Disable the use of Compression on Message bodies.  When enabled all
+        /// messages have their body compressed using the Deflate compression algorithm.
+        /// The recipient of the message must support the use of message compression as well
+        /// otherwise the receiving client will receive a message whose body appears in the
+        /// compressed form.
+        /// </summary>
+        public bool UseCompression
+        {
+            get { return this.useCompression; }
+            set { this.useCompression = value; }
+        }
+
         public IConnectionMetaData MetaData
         {
             get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Thu Nov  5 22:33:48 2009
@@ -37,6 +37,7 @@
 		private string connectionUserName;
 		private string connectionPassword;
 		private string clientId;
+        private bool useCompression;
         
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -97,15 +98,18 @@
 			ITransport transport = TransportFactory.CreateTransport(uri);
 			Connection connection = new Connection(uri, transport, info);
 
+            // Set the Factory level configuration to the Connection, this can be overriden by
+            // the params on the Connection URI so we do this before applying the params.
+            connection.UseCompression = this.useCompression;
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+            
 			// Set properties on connection using parameters prefixed with "connection."
 			// Since this could be a composite Uri, assume the connection-specific parameters
 			// are associated with the outer-most specification of the composite Uri. What's nice
 			// is that this works with simple Uri as well.
 			URISupport.CompositeData c = URISupport.parseComposite(uri);
 			URISupport.SetProperties(connection, c.Parameters, "connection.");
-
-            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
             
 			connection.ITransport.Start();
 			return connection;
@@ -140,6 +144,12 @@
 			set { clientId = value; }
 		}
 
+        public bool UseCompression
+        {
+            get { return this.useCompression; }
+            set { this.useCompression = value; }
+        }
+
         public IRedeliveryPolicy RedeliveryPolicy
         {
             get { return this.redeliveryPolicy; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Nov  5 22:33:48 2009
@@ -584,7 +584,7 @@
                 } 
                 else if(dispatch.Message.IsExpired())
                 {
-                    Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch);
+                    Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
                     
                     BeforeMessageIsConsumed(dispatch);
                     AfterMessageIsConsumed(dispatch, true);
@@ -596,7 +596,6 @@
                 } 
                 else 
                 {
-                    Tracer.DebugFormat("{0} received message: {1}", info.ConsumerId, dispatch);
                     return dispatch;
                 }
             }
@@ -920,6 +919,8 @@
         private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch) 
         {
             ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
+			
+			message.Connection = this.session.Connection;
             
             if(this.session.IsClientAcknowledge)
             {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Nov  5 22:33:48 2009
@@ -514,48 +514,48 @@
         public IMessage CreateMessage()
         {
             ActiveMQMessage answer = new ActiveMQMessage();
-            return answer;
+            return ConfigureMessage(answer) as IMessage;
         }
 
         public ITextMessage CreateTextMessage()
         {
             ActiveMQTextMessage answer = new ActiveMQTextMessage();
-            return answer;
+            return ConfigureMessage(answer) as ITextMessage;
         }
 
         public ITextMessage CreateTextMessage(string text)
         {
             ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-            return answer;
+            return ConfigureMessage(answer) as ITextMessage;
         }
 
         public IMapMessage CreateMapMessage()
         {
-            return new ActiveMQMapMessage();
+            return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage;
         }
 
         public IBytesMessage CreateBytesMessage()
         {
-            return new ActiveMQBytesMessage();
+            return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage;
         }
 
         public IBytesMessage CreateBytesMessage(byte[] body)
         {
             ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
             answer.Content = body;
-            return answer;
+            return ConfigureMessage(answer) as IBytesMessage;
         }
 
 		public IStreamMessage CreateStreamMessage()
 		{
-			return new ActiveMQStreamMessage();
+			return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
 		}
 		
 		public IObjectMessage CreateObjectMessage(object body)
         {
             ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
             answer.Body = body;
-            return answer;
+            return ConfigureMessage(answer) as IObjectMessage;
         }
 
         public void Commit()
@@ -804,6 +804,32 @@
                 }                
             }
         }
+
+        private ActiveMQMessage ConfigureMessage(ActiveMQMessage message)
+        {
+            message.Connection = this.connection;
+
+            if(this.IsTransacted)
+            {
+                // Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
+                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Prevents message from throwing an exception if a client calls Acknoweldge on
+        /// a message that is part of a transaction either being produced or consumed.  The
+        /// JMS Spec indicates that users should be able to call Acknowledge with no effect
+        /// if the message is in a transaction.
+        /// </summary>
+        /// <param name="message">
+        /// A <see cref="ActiveMQMessage"/>
+        /// </param>
+        private void DoNothingAcknowledge(ActiveMQMessage message)
+        {
+        }
         
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQBytesMessageTest.cs Thu Nov  5 22:33:48 2009
@@ -30,9 +30,24 @@
 		{
 			ActiveMQBytesMessage message = new ActiveMQBytesMessage();
 			
-			Assert.IsNull( message.Content );
+			// Test that a BytesMessage is created in WriteOnly mode.
+			try
+			{
+				byte[] content = message.Content;
+				content.SetValue(0, 0);
+				Assert.Fail("Should have thrown an exception");
+			}
+			catch
+			{
+			}
+			
 			Assert.IsTrue( !message.ReadOnlyBody );
 			Assert.IsTrue( !message.ReadOnlyProperties );
+			
+			message.Reset();
+			
+			Assert.IsNull( message.Content );
+			Assert.IsTrue( message.ReadOnlyBody );
 		}
 
 		[Test]

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs?rev=833220&r1=833219&r2=833220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/ActiveMQMessageTest.cs Thu Nov  5 22:33:48 2009
@@ -445,7 +445,6 @@
             ActiveMQBytesMessage message = new ActiveMQBytesMessage();
             message.ClearBody();
             Assert.IsFalse(message.ReadOnlyBody);
-            Assert.IsNull(message.Content);
         }
     
         [Test]

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs?rev=833220&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs Thu Nov  5 22:33:48 2009
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Text;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ;
+using Apache.NMS.ActiveMQ.Commands;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Test
+{
+    [TestFixture]
+    public class MessageCompressionTest : NMSTestSupport
+    {
+        protected static string TEST_CLIENT_ID = "MessageCompressionTestClientId";
+        protected static string DESTINATION_NAME = "MessageCompressionTestDest";
+        
+        // The following text should compress well
+        private const string TEXT = "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+                                  + "The quick red fox jumped over the lazy brown dog. ";
+
+        protected bool a = true;
+        protected byte b = 123;
+        protected char c = 'c';
+        protected short d = 0x1234;
+        protected int e = 0x12345678;
+        protected long f = 0x1234567812345678;
+        protected string g = "Hello World!";
+        protected bool h = false;
+        protected byte i = 0xFF;
+        protected short j = -0x1234;
+        protected int k = -0x12345678;
+        protected long l = -0x1234567812345678;
+        protected float m = 2.1F;
+        protected double n = 2.3;
+        
+        [Test]
+        public void TestTextMessageCompression()
+        {
+            using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+            {
+                connection.UseCompression = true;
+                connection.Start();
+
+                Assert.IsTrue(connection.UseCompression);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    ITextMessage message = session.CreateTextMessage(TEXT);
+                    
+                    IDestination destination = session.CreateTemporaryQueue();
+                    
+                    IMessageProducer producer = session.CreateProducer(destination);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                    producer.Send(message);
+
+                    message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ITextMessage;
+                    
+                    Assert.IsNotNull(message);
+                    Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+                    Assert.AreEqual(TEXT, message.Text);
+                }
+            }
+        }
+
+        [Test]
+        public void TestStreamMessageCompression()
+        {
+            using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+            {
+                connection.UseCompression = true;
+                connection.Start();
+
+                Assert.IsTrue(connection.UseCompression);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IStreamMessage message = session.CreateStreamMessage();
+                    
+                    IDestination destination = session.CreateTemporaryQueue();
+                    
+                    IMessageProducer producer = session.CreateProducer(destination);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                    message.WriteBoolean(a);
+                    message.WriteByte(b);
+                    message.WriteChar(c);
+                    message.WriteInt16(d);
+                    message.WriteInt32(e);
+                    message.WriteInt64(f);
+                    message.WriteString(g);
+                    message.WriteBoolean(h);
+                    message.WriteByte(i);
+                    message.WriteInt16(j);
+                    message.WriteInt32(k);
+                    message.WriteInt64(l);
+                    message.WriteSingle(m);
+                    message.WriteDouble(n);
+                    
+                    producer.Send(message);
+
+                    message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as IStreamMessage;
+                    
+                    Assert.IsNotNull(message);
+                    Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+                    // use generic API to access entries
+                    Assert.AreEqual(a, message.ReadBoolean(), "Stream Boolean Value: a");
+                    Assert.AreEqual(b, message.ReadByte(), "Stream Byte Value: b");
+                    Assert.AreEqual(c, message.ReadChar(), "Stream Char Value: c");
+                    Assert.AreEqual(d, message.ReadInt16(), "Stream Int16 Value: d");
+                    Assert.AreEqual(e, message.ReadInt32(), "Stream Int32 Value: e");
+                    Assert.AreEqual(f, message.ReadInt64(), "Stream Int64 Value: f");
+                    Assert.AreEqual(g, message.ReadString(), "Stream String Value: g");
+                    Assert.AreEqual(h, message.ReadBoolean(), "Stream Boolean Value: h");
+                    Assert.AreEqual(i, message.ReadByte(), "Stream Byte Value: i");
+                    Assert.AreEqual(j, message.ReadInt16(), "Stream Int16 Value: j");
+                    Assert.AreEqual(k, message.ReadInt32(), "Stream Int32 Value: k");
+                    Assert.AreEqual(l, message.ReadInt64(), "Stream Int64 Value: l");
+                    Assert.AreEqual(m, message.ReadSingle(), "Stream Single Value: m");
+                    Assert.AreEqual(n, message.ReadDouble(), "Stream Double Value: n");                    
+                }
+            }
+        }
+
+        [Test]
+        public void TestMapMessageCompression()
+        {
+            using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+            {
+                connection.UseCompression = true;
+                connection.Start();
+
+                Assert.IsTrue(connection.UseCompression);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IMapMessage message = session.CreateMapMessage();
+                    
+                    IDestination destination = session.CreateTemporaryQueue();
+                    
+                    IMessageProducer producer = session.CreateProducer(destination);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                    message.Body["a"] = a;
+                    message.Body["b"] = b;
+                    message.Body["c"] = c;
+                    message.Body["d"] = d;
+                    message.Body["e"] = e;
+                    message.Body["f"] = f;
+                    message.Body["g"] = g;
+                    message.Body["h"] = h;
+                    message.Body["i"] = i;
+                    message.Body["j"] = j;
+                    message.Body["k"] = k;
+                    message.Body["l"] = l;
+                    message.Body["m"] = m;
+                    message.Body["n"] = n;
+                    
+                    producer.Send(message);
+
+                    message = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as IMapMessage;
+                    
+                    Assert.IsNotNull(message);
+                    Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+                    Assert.AreEqual(a, message.Body.GetBool("a"), "map entry: a");
+                    Assert.AreEqual(b, message.Body.GetByte("b"), "map entry: b");
+                    Assert.AreEqual(c, message.Body.GetChar("c"), "map entry: c");
+                    Assert.AreEqual(d, message.Body.GetShort("d"), "map entry: d");
+                    Assert.AreEqual(e, message.Body.GetInt("e"), "map entry: e");
+                    Assert.AreEqual(f, message.Body.GetLong("f"), "map entry: f");
+                    Assert.AreEqual(g, message.Body.GetString("g"), "map entry: g");
+                    Assert.AreEqual(h, message.Body.GetBool("h"), "map entry: h");
+                    Assert.AreEqual(i, message.Body.GetByte("i"), "map entry: i");
+                    Assert.AreEqual(j, message.Body.GetShort("j"), "map entry: j");
+                    Assert.AreEqual(k, message.Body.GetInt("k"), "map entry: k");
+                    Assert.AreEqual(l, message.Body.GetLong("l"), "map entry: l");
+                    Assert.AreEqual(m, message.Body.GetFloat("m"), "map entry: m");
+                    Assert.AreEqual(n, message.Body.GetDouble("n"), "map entry: n");             
+                }
+            }
+        }
+
+        [Test]
+        public void TestBytesMessageCompression()
+        {
+            using(Connection connection = CreateConnection(TEST_CLIENT_ID) as Connection)
+            {
+                connection.UseCompression = true;
+                connection.Start();
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IDestination destination = session.CreateTemporaryQueue();
+                    using(IMessageConsumer consumer = session.CreateConsumer(destination))
+                    using(IMessageProducer producer = session.CreateProducer(destination))
+                    {
+                        IBytesMessage message = session.CreateBytesMessage();
+                        
+                        message.WriteBoolean(a);
+                        message.WriteByte(b);
+                        message.WriteChar(c);
+                        message.WriteInt16(d);
+                        message.WriteInt32(e);
+                        message.WriteInt64(f);
+                        message.WriteString(g);
+                        message.WriteBoolean(h);
+                        message.WriteByte(i);
+                        message.WriteInt16(j);
+                        message.WriteInt32(k);
+                        message.WriteInt64(l);
+                        message.WriteSingle(m);
+                        message.WriteDouble(n);
+                        
+                        producer.Send(message);
+
+                        message = consumer.Receive(receiveTimeout) as IBytesMessage;
+
+                        Assert.IsNotNull(message);
+                        Assert.IsTrue(((ActiveMQMessage) message).Compressed);
+
+                        Assert.AreEqual(a, message.ReadBoolean(), "Stream Boolean Value: a");
+                        Assert.AreEqual(b, message.ReadByte(), "Stream Byte Value: b");
+                        Assert.AreEqual(c, message.ReadChar(), "Stream Char Value: c");
+                        Assert.AreEqual(d, message.ReadInt16(), "Stream Int16 Value: d");
+                        Assert.AreEqual(e, message.ReadInt32(), "Stream Int32 Value: e");
+                        Assert.AreEqual(f, message.ReadInt64(), "Stream Int64 Value: f");
+                        Assert.AreEqual(g, message.ReadString(), "Stream String Value: g");
+                        Assert.AreEqual(h, message.ReadBoolean(), "Stream Boolean Value: h");
+                        Assert.AreEqual(i, message.ReadByte(), "Stream Byte Value: i");
+                        Assert.AreEqual(j, message.ReadInt16(), "Stream Int16 Value: j");
+                        Assert.AreEqual(k, message.ReadInt32(), "Stream Int32 Value: k");
+                        Assert.AreEqual(l, message.ReadInt64(), "Stream Int64 Value: l");
+                        Assert.AreEqual(m, message.ReadSingle(), "Stream Single Value: m");
+                        Assert.AreEqual(n, message.ReadDouble(), "Stream Double Value: n");                           
+                    }
+                }
+            }
+        }
+        
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Commands/MessageCompressionTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native