You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2016/09/10 23:09:21 UTC

svn commit: r1760211 [9/12] - in /activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk: ./ src/main/csharp/ src/main/csharp/Readers/ src/main/csharp/Selector/ src/test/csharp/ src/test/csharp/Commands/

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,901 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Collections;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+    public class StreamMessage : Message, IStreamMessage
+    {
+        private EndianBinaryReader dataIn = null;
+        private EndianBinaryWriter dataOut = null;
+        private MemoryStream byteBuffer = null;
+        private int bytesRemaining = -1;
+
+        public bool ReadBoolean()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.BOOLEAN_TYPE)
+                    {
+                        return this.dataIn.ReadBoolean();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Boolean.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a bool");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Boolean type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public byte ReadByte()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Byte.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a byte");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Byte type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public char ReadChar()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.CHAR_TYPE)
+                    {
+                        return this.dataIn.ReadChar();
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a char");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Char type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public short ReadInt16()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int16.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a short");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int16 type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadInt32()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.INTEGER_TYPE)
+                    {
+                        return this.dataIn.ReadInt32();
+                    }
+                    else if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int32.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a int");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int32 type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public long ReadInt64()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.LONG_TYPE)
+                    {
+                        return this.dataIn.ReadInt64();
+                    }
+                    else if(type == PrimitiveMap.INTEGER_TYPE)
+                    {
+                        return this.dataIn.ReadInt32();
+                    }
+                    else if(type == PrimitiveMap.SHORT_TYPE)
+                    {
+                        return this.dataIn.ReadInt16();
+                    }
+                    else if(type == PrimitiveMap.BYTE_TYPE)
+                    {
+                        return this.dataIn.ReadByte();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Int64.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a long");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Int64 type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public float ReadSingle()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.FLOAT_TYPE)
+                    {
+                        return this.dataIn.ReadSingle();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Single.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a float");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Single type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public double ReadDouble()
+        {
+            InitializeReading();
+
+            try
+            {
+                long startingPos = this.byteBuffer.Position;
+                try
+                {
+                    int type = this.dataIn.ReadByte();
+
+                    if(type == PrimitiveMap.DOUBLE_TYPE)
+                    {
+                        return this.dataIn.ReadDouble();
+                    }
+                    else if(type == PrimitiveMap.FLOAT_TYPE)
+                    {
+                        return this.dataIn.ReadSingle();
+                    }
+                    else if(type == PrimitiveMap.STRING_TYPE)
+                    {
+                        return Single.Parse(this.dataIn.ReadString16());
+                    }
+                    else if(type == PrimitiveMap.NULL)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new NMSException("Cannot convert Null type to a double");
+                    }
+                    else
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Value is not a Double type.");
+                    }
+                }
+                catch(FormatException e)
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw NMSExceptionSupport.CreateMessageFormatException(e);
+                }
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public string ReadString()
+        {
+            InitializeReading();
+
+            long startingPos = this.byteBuffer.Position;
+
+            try
+            {
+                int type = this.dataIn.ReadByte();
+
+                if(type == PrimitiveMap.BIG_STRING_TYPE)
+                {
+                    return this.dataIn.ReadString32();
+                }
+                else if(type == PrimitiveMap.STRING_TYPE)
+                {
+                    return this.dataIn.ReadString16();
+                }
+                else if(type == PrimitiveMap.LONG_TYPE)
+                {
+                    return this.dataIn.ReadInt64().ToString();
+                }
+                else if(type == PrimitiveMap.INTEGER_TYPE)
+                {
+                    return this.dataIn.ReadInt32().ToString();
+                }
+                else if(type == PrimitiveMap.SHORT_TYPE)
+                {
+                    return this.dataIn.ReadInt16().ToString();
+                }
+                else if(type == PrimitiveMap.FLOAT_TYPE)
+                {
+                    return this.dataIn.ReadSingle().ToString();
+                }
+                else if(type == PrimitiveMap.DOUBLE_TYPE)
+                {
+                    return this.dataIn.ReadDouble().ToString();
+                }
+                else if(type == PrimitiveMap.CHAR_TYPE)
+                {
+                    return this.dataIn.ReadChar().ToString();
+                }
+                else if(type == PrimitiveMap.BYTE_TYPE)
+                {
+                    return this.dataIn.ReadByte().ToString();
+                }
+                else if(type == PrimitiveMap.BOOLEAN_TYPE)
+                {
+                    return this.dataIn.ReadBoolean().ToString();
+                }
+                else if(type == PrimitiveMap.NULL)
+                {
+                    return null;
+                }
+                else
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw new MessageFormatException("Value is not a known type.");
+                }
+            }
+            catch(FormatException e)
+            {
+                this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadBytes(byte[] value)
+        {
+            InitializeReading();
+
+            if(value == null)
+            {
+                throw new NullReferenceException("Passed Byte Array is null");
+            }
+
+            try
+            {
+                if(this.bytesRemaining == -1)
+                {
+                    long startingPos = this.byteBuffer.Position;
+                    byte type = this.dataIn.ReadByte();
+
+                    if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
+                    {
+                        this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                        throw new MessageFormatException("Not a byte array");
+                    }
+
+                    this.bytesRemaining = this.dataIn.ReadInt32();
+                }
+                else if(this.bytesRemaining == 0)
+                {
+                    this.bytesRemaining = -1;
+                    return -1;
+                }
+
+                if(value.Length <= this.bytesRemaining)
+                {
+                    // small buffer
+                    this.bytesRemaining -= value.Length;
+                    this.dataIn.Read(value, 0, value.Length);
+                    return value.Length;
+                }
+                else
+                {
+                    // big buffer
+                    int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
+                    this.bytesRemaining = 0;
+                    return rc;
+                }
+            }
+            catch(EndOfStreamException ex)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(ex);
+            }
+            catch(IOException ex)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(ex);
+            }
+        }
+
+        public Object ReadObject()
+        {
+            InitializeReading();
+
+            long startingPos = this.byteBuffer.Position;
+
+            try
+            {
+                int type = this.dataIn.ReadByte();
+
+                if(type == PrimitiveMap.BIG_STRING_TYPE)
+                {
+                    return this.dataIn.ReadString32();
+                }
+                else if(type == PrimitiveMap.STRING_TYPE)
+                {
+                    return this.dataIn.ReadString16();
+                }
+                else if(type == PrimitiveMap.LONG_TYPE)
+                {
+                    return this.dataIn.ReadInt64();
+                }
+                else if(type == PrimitiveMap.INTEGER_TYPE)
+                {
+                    return this.dataIn.ReadInt32();
+                }
+                else if(type == PrimitiveMap.SHORT_TYPE)
+                {
+                    return this.dataIn.ReadInt16();
+                }
+                else if(type == PrimitiveMap.FLOAT_TYPE)
+                {
+                    return this.dataIn.ReadSingle();
+                }
+                else if(type == PrimitiveMap.DOUBLE_TYPE)
+                {
+                    return this.dataIn.ReadDouble();
+                }
+                else if(type == PrimitiveMap.CHAR_TYPE)
+                {
+                    return this.dataIn.ReadChar();
+                }
+                else if(type == PrimitiveMap.BYTE_TYPE)
+                {
+                    return this.dataIn.ReadByte();
+                }
+                else if(type == PrimitiveMap.BOOLEAN_TYPE)
+                {
+                    return this.dataIn.ReadBoolean();
+                }
+                else if(type == PrimitiveMap.BYTE_ARRAY_TYPE)
+                {
+                    int length = this.dataIn.ReadInt32();
+                    byte[] data = new byte[length];
+                    this.dataIn.Read(data, 0, length);
+                    return data;
+                }
+                else if(type == PrimitiveMap.NULL)
+                {
+                    return null;
+                }
+                else
+                {
+                    this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                    throw new MessageFormatException("Value is not a known type.");
+                }
+            }
+            catch(FormatException e)
+            {
+                this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+            catch(EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch(IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public void WriteBoolean(bool value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteByte(byte value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteBytes(byte[] value)
+        {
+            InitializeWriting();
+            this.WriteBytes(value, 0, value.Length);
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
+                this.dataOut.Write((int) length);
+                this.dataOut.Write(value, offset, length);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteChar(char value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt16(short value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt32(int value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt64(long value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.LONG_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteSingle(float value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteDouble(double value)
+        {
+            InitializeWriting();
+            try
+            {
+                this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
+                this.dataOut.Write(value);
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteString(string value)
+        {
+            InitializeWriting();
+            try
+            {
+                if( value.Length > 8192 )
+                {
+                    this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
+                    this.dataOut.WriteString32(value);
+                }
+                else
+                {
+                    this.dataOut.Write(PrimitiveMap.STRING_TYPE);
+                    this.dataOut.WriteString16(value);
+                }
+            }
+            catch(IOException e)
+            {
+                NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteObject(Object value)
+        {
+            InitializeWriting();
+            if( value is System.Byte )
+            {
+                this.WriteByte( (byte) value );
+            }
+            else if( value is Char )
+            {
+                this.WriteChar( (char) value );
+            }
+            else if( value is Boolean )
+            {
+                this.WriteBoolean( (bool) value );
+            }
+            else if( value is Int16 )
+            {
+                this.WriteInt16( (short) value );
+            }
+            else if( value is Int32 )
+            {
+                this.WriteInt32( (int) value );
+            }
+            else if( value is Int64 )
+            {
+                this.WriteInt64( (long) value );
+            }
+            else if( value is Single )
+            {
+                this.WriteSingle( (float) value );
+            }
+            else if( value is Double )
+            {
+                this.WriteDouble( (double) value );
+            }
+            else if( value is byte[] )
+            {
+                this.WriteBytes( (byte[]) value );
+            }
+            else if( value is String )
+            {
+                this.WriteString( (string) value );
+            }
+            else
+            {
+                throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+            }
+        }
+
+        public override Object Clone()
+        {
+            StoreContent();
+            return base.Clone();
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            this.byteBuffer = null;
+            this.dataIn = null;
+            this.dataOut = null;
+            this.bytesRemaining = -1;
+        }
+
+        public void Reset()
+        {
+            StoreContent();
+            this.dataIn = null;
+            this.dataOut = null;
+            this.byteBuffer = null;
+            this.bytesRemaining = -1;
+            this.ReadOnlyBody = true;
+        }
+
+        private void InitializeReading()
+        {
+            FailIfWriteOnlyBody();
+            if(this.dataIn == null)
+            {
+                this.byteBuffer = new MemoryStream(this.Content, false);
+                this.dataIn = new EndianBinaryReader(this.byteBuffer);
+            }
+        }
+
+        private void InitializeWriting()
+        {
+            FailIfReadOnlyBody();
+            if(this.dataOut == null)
+            {
+                this.byteBuffer = new MemoryStream();
+                this.dataOut = new EndianBinaryWriter(this.byteBuffer);
+            }
+        }
+
+        private void StoreContent()
+        {
+            if( dataOut != null)
+            {
+                dataOut.Close();
+
+                this.Content = byteBuffer.ToArray();
+                this.dataOut = null;
+                this.byteBuffer = null;
+            }
+        }
+
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Commands
+{
+    public abstract class TempDestination : Destination
+    {
+        /// <summary>
+        /// Method CreateDestination
+        /// </summary>
+        /// <returns>An Destination</returns>
+        /// <param name="name">A  String</param>
+        public override Destination CreateDestination(String name)
+        {
+            return null;
+        }
+
+        abstract override public DestinationType DestinationType
+        {
+            get;
+        }
+
+        public TempDestination()
+            : base()
+        {
+        }
+
+        public TempDestination(String name)
+            : base(name)
+        {
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempDestination o = (TempDestination) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+        public void Delete()
+        {
+            throw new NotSupportedException("Stomp Cannot Delete Temporary Destinations");
+        }
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+    /// <summary>
+    /// A Temporary Queue
+    /// </summary>
+    public class TempQueue : TempDestination, ITemporaryQueue
+    {
+        public TempQueue()
+            : base()
+        {
+        }
+
+        public TempQueue(String name)
+            : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.TemporaryQueue;
+            }
+        }
+
+        public String QueueName
+        {
+            get { return PhysicalName; }
+        }
+
+        public String GetQueueName()
+        {
+            return PhysicalName;
+        }
+
+        public override int GetDestinationType()
+        {
+            return TEMPORARY_QUEUE;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new TempQueue(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempQueue o = (TempQueue) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+    /// <summary>
+    /// A Temporary Topic
+    /// </summary>
+    public class TempTopic : TempDestination, ITemporaryTopic
+    {
+        public TempTopic() : base()
+        {
+        }
+
+        public TempTopic(String name) : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get { return DestinationType.TemporaryTopic; }
+        }
+
+        public String TopicName
+        {
+            get { return PhysicalName; }
+        }
+
+        public String GetTopicName()
+        {
+            return PhysicalName;
+        }
+
+        public override int GetDestinationType()
+        {
+            return TEMPORARY_TOPIC;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new TempTopic(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            TempTopic o = (TempTopic) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+    public class TextMessage : Message, ITextMessage
+    {
+        private String text = null;
+
+        public TextMessage()
+        {
+        }
+
+        public TextMessage(String text)
+        {
+            this.Text = text;
+        }
+
+        public override string ToString()
+        {
+            string text = this.Text;
+
+            if(text != null && text.Length > 63)
+            {
+                text = text.Substring(0, 45) + "..." + text.Substring(text.Length - 12);
+            }
+            return base.ToString() + " Text = " + (text ?? "null");
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            this.text = null;
+        }
+
+        // Properties
+
+        public string Text
+        {
+            get { return this.text; }
+            set
+            {
+                FailIfReadOnlyBody();
+                this.text = value;
+                this.Content = null;
+            }
+        }
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+    /// <summary>
+    /// Summary description for Topic.
+    /// </summary>
+    public class Topic : Destination, ITopic
+    {
+        public Topic() : base()
+        {
+        }
+
+        public Topic(String name) : base(name)
+        {
+        }
+
+        override public DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.Topic;
+            }
+        }
+
+        public String TopicName
+        {
+            get { return PhysicalName; }
+        }
+
+        public override int GetDestinationType()
+        {
+            return TOPIC;
+        }
+
+        public override Destination CreateDestination(String name)
+        {
+            return new Topic(name);
+        }
+
+        public override Object Clone()
+        {
+            // Since we are a derived class use the base's Clone()
+            // to perform the shallow copy. Since it is shallow it
+            // will include our derived class. Since we are derived,
+            // this method is an override.
+            Topic o = (Topic) base.Clone();
+
+            // Now do the deep work required
+            // If any new variables are added then this routine will
+            // likely need updating
+
+            return o;
+        }
+    }
+}
+

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+	//[TestFixture]
+	public class ConnectionTest : NMSTest
+	{
+		IConnection startedConnection = null;
+		IConnection stoppedConnection = null;
+
+		protected ConnectionTest(NMSTestSupport testSupport)
+			: base(testSupport)
+		{
+		}
+
+		//[SetUp]
+		public override void SetUp()
+		{
+			base.SetUp();
+
+			startedConnection = CreateConnection(null);
+			startedConnection.Start();
+			stoppedConnection = CreateConnection(null);
+		}
+
+		//[TearDown]
+		public override void TearDown()
+		{
+			startedConnection.Close();
+			stoppedConnection.Close();
+
+			base.TearDown();
+		}
+
+		/// <summary>
+		/// Verify that it is possible to create multiple connections to the broker.
+		/// There was a bug in the connection factory which set the clientId member which made
+		/// it impossible to create an additional connection.
+		/// </summary>
+		//[Test]
+		public virtual void TestTwoConnections()
+		{
+			using(IConnection connection1 = CreateConnection(null))
+			{
+				connection1.Start();
+				using(IConnection connection2 = CreateConnection(null))
+				{
+					// with the bug present we'll get an exception in connection2.start()
+					connection2.Start();
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestCreateAndDisposeWithConsumer(
+			//[Values(true, false)]
+			bool disposeConsumer, string testDestRef)
+		{
+			using(IConnection connection = CreateConnection("DisposalTestConnection"))
+			{
+				connection.Start();
+
+				using(ISession session = connection.CreateSession())
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					IMessageConsumer consumer = session.CreateConsumer(destination);
+
+					connection.Stop();
+					if(disposeConsumer)
+					{
+						consumer.Dispose();
+					}
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestCreateAndDisposeWithProducer(
+			//[Values(true, false)]
+			bool disposeProducer, string testDestRef)
+		{
+			using(IConnection connection = CreateConnection("DisposalTestConnection"))
+			{
+				connection.Start();
+
+				using(ISession session = connection.CreateSession())
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					IMessageProducer producer = session.CreateProducer(destination);
+
+					connection.Stop();
+					if(disposeProducer)
+					{
+						producer.Dispose();
+					}
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestStartAfterSend(
+			//[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+			MsgDeliveryMode deliveryMode,
+			string testDestRef)
+		{
+			using(IConnection connection = CreateConnection(GetTestClientId()))
+			{
+				ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+
+				// Send the messages
+				SendMessages(session, destination, deliveryMode, 1);
+
+				// Start the conncection after the message was sent.
+				connection.Start();
+
+				// Make sure only 1 message was delivered.
+				Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+				Assert.IsNull(consumer.ReceiveNoWait());
+			}
+		}
+
+		/// <summary>
+		/// Tests if the consumer receives the messages that were sent before the
+		/// connection was started.
+		/// </summary>
+		//[Test]
+		public virtual void TestStoppedConsumerHoldsMessagesTillStarted(string testDestRef)
+		{
+			ISession startedSession = startedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ISession stoppedSession = stoppedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+			// Setup the consumers.
+			IDestination destination = GetClearDestinationByNodeReference(startedSession, testDestRef);
+			IMessageConsumer startedConsumer = startedSession.CreateConsumer(destination);
+			IMessageConsumer stoppedConsumer = stoppedSession.CreateConsumer(destination);
+
+			// Send the message.
+			IMessageProducer producer = startedSession.CreateProducer(destination);
+			ITextMessage message = startedSession.CreateTextMessage("Hello");
+			producer.Send(message);
+
+			// Test the assertions.
+			IMessage m;
+            if(destination.IsTopic)
+            {
+                m = startedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+			    Assert.IsNotNull(m);
+            }
+
+			m = stoppedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+			Assert.IsNull(m);
+
+			stoppedConnection.Start();
+			m = stoppedConsumer.Receive(TimeSpan.FromMilliseconds(5000));
+			Assert.IsNotNull(m);
+
+			startedSession.Close();
+			stoppedSession.Close();
+		}
+
+		/// <summary>
+		/// Tests if the consumer is able to receive messages even when the
+		/// connecction restarts multiple times.
+		/// </summary>
+		//[Test]
+		public virtual void TestMultipleConnectionStops(string testDestRef)
+		{
+			TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+			stoppedConnection.Stop();
+			TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+			stoppedConnection.Stop();
+			TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+	//[TestFixture]
+	public class ConsumerTest : NMSTest
+	{
+		protected const int COUNT = 25;
+		protected const string VALUE_NAME = "value";
+
+		private bool dontAck;
+
+		protected ConsumerTest(NMSTestSupport testSupport)
+			: base(testSupport)
+		{
+		}
+
+// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
+#if !NETCF
+		//[Test]
+		public virtual void TestNoTimeoutConsumer(
+            string testDestRef,
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode)
+		{
+			// Launch a thread to perform IMessageConsumer.Receive().
+			// If it doesn't fail in less than three seconds, no exception was thrown.
+			Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(ackMode))
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					using(this.timeoutConsumer = session.CreateConsumer(destination))
+					{
+						receiveThread.Start();
+						if(receiveThread.Join(3000))
+						{
+							Assert.Fail("IMessageConsumer.Receive() returned without blocking.  Test failed.");
+						}
+						else
+						{
+                            try
+                            {
+							    // Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+							    receiveThread.Interrupt();
+                                // MSMQ MessageQueue.Receive is interrupted neither by Thread.Interrupt, nor by
+                                // Thread.Abort. Send a dummy message to stop the background thread.
+                                IMessageProducer producer = session.CreateProducer(destination);
+                                producer.Send(session.CreateMessage());
+                            }
+                            catch
+                            {
+                            }
+						}
+					}
+				}
+			}
+		}
+
+		protected IMessageConsumer timeoutConsumer;
+
+		protected void TimeoutConsumerThreadProc()
+		{
+			try
+			{
+				timeoutConsumer.Receive();
+			}
+			catch(ArgumentOutOfRangeException e)
+			{
+				// The test failed.  We will know because the timeout will expire inside TestNoTimeoutConsumer().
+				Assert.Fail("Test failed with exception: " + e.Message);
+			}
+			catch(ThreadInterruptedException)
+			{
+				// The test succeeded!  We were still blocked when we were interrupted.
+			}
+			catch(Exception e)
+			{
+				// Some other exception occurred.
+				Assert.Fail("Test failed with exception: " + e.Message);
+			}
+		}
+
+		//[Test]
+		public virtual void TestSyncReceiveConsumerClose(
+            string testDestRef,
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode)
+		{
+			// Launch a thread to perform IMessageConsumer.Receive().
+			// If it doesn't fail in less than three seconds, no exception was thrown.
+			Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+			using (IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using (ISession session = connection.CreateSession(ackMode))
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					using (this.timeoutConsumer = session.CreateConsumer(destination))
+					{
+						receiveThread.Start();
+						if (receiveThread.Join(3000))
+						{
+							Assert.Fail("IMessageConsumer.Receive() returned without blocking.  Test failed.");
+						}
+						else
+						{
+							// Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+							this.timeoutConsumer.Close();
+							receiveThread.Join(10000);
+							if (receiveThread.IsAlive)
+							{
+								Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it.");
+                                try
+                                {
+							        // Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+							        receiveThread.Interrupt();
+                                    // MSMQ MessageQueue.Receive is interrupted neither by Thread.Interrupt, nor by
+                                    // Thread.Abort. Send a dummy message to stop the background thread.
+                                    IMessageProducer producer = session.CreateProducer(destination);
+                                    producer.Send(session.CreateMessage());
+                                }
+                                catch
+                                {
+                                }
+							}
+						}
+					}
+				}
+			}
+		}
+
+		internal class ThreadArg
+		{
+			internal IConnection connection = null;
+			internal ISession session = null;
+			internal IDestination destination = null;
+		}
+
+		protected void DelayedProducerThreadProc(Object arg)
+		{
+			try
+			{
+				ThreadArg args = arg as ThreadArg;
+
+				using(ISession session = args.connection.CreateSession())
+				{
+					using(IMessageProducer producer = session.CreateProducer(args.destination))
+					{
+						// Give the consumer time to enter the receive.
+						Thread.Sleep(5000);
+
+						producer.Send(args.session.CreateTextMessage("Hello World"));
+					}
+				}
+			}
+			catch(Exception e)
+			{
+				// Some other exception occurred.
+				Assert.Fail("Test failed with exception: " + e.Message);
+			}
+		}
+
+		//[Test]
+		public virtual void TestDoChangeSentMessage(
+            string testDestRef,
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode,
+			//[Values(true, false)]
+			bool doClear)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(ackMode))
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					using(IMessageConsumer consumer = session.CreateConsumer(destination))
+					{
+						IMessageProducer producer = session.CreateProducer(destination);
+						ITextMessage message = session.CreateTextMessage();
+
+						string prefix = "ConsumerTest - TestDoChangeSentMessage: ";
+
+						for(int i = 0; i < COUNT; i++)
+						{
+							message.Properties[VALUE_NAME] = i;
+							message.Text = prefix + Convert.ToString(i);
+
+							producer.Send(message);
+
+							if(doClear)
+							{
+								message.ClearBody();
+								message.ClearProperties();
+							}
+						}
+
+						if(ackMode == AcknowledgementMode.Transactional)
+						{
+							session.Commit();
+						}
+
+						for(int i = 0; i < COUNT; i++)
+						{
+							ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+							Assert.AreEqual(msg.Text, prefix + Convert.ToString(i));
+							Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i);
+						}
+
+						if(ackMode == AcknowledgementMode.Transactional)
+						{
+							session.Commit();
+						}
+
+					}
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestConsumerReceiveBeforeMessageDispatched(
+            string testDestRef,
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode)
+		{
+			// Launch a thread to perform a delayed send.
+			Thread sendThread = new Thread(DelayedProducerThreadProc);
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(ackMode))
+				{
+					IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+					using(IMessageConsumer consumer = session.CreateConsumer(destination))
+					{
+						ThreadArg arg = new ThreadArg();
+
+						arg.connection = connection;
+						arg.session = session;
+						arg.destination = destination;
+
+						sendThread.Start(arg);
+						IMessage message = consumer.Receive(TimeSpan.FromMinutes(1));
+						Assert.IsNotNull(message);
+					}
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestDontStart(
+            string testDestRef,
+			//[Values(MsgDeliveryMode.NonPersistent)]
+			MsgDeliveryMode deliveryMode)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				ISession session = connection.CreateSession();
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+
+				// Send the messages
+				SendMessages(session, destination, deliveryMode, 1);
+
+				// Make sure no messages were delivered.
+				Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+			}
+		}
+
+		//[Test]
+		public virtual void TestSendReceiveTransacted(
+            string testDestRef,
+			//[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+			MsgDeliveryMode deliveryMode)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				// Send a message to the broker.
+				connection.Start();
+				ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+				IMessageProducer producer = session.CreateProducer(destination);
+
+				producer.DeliveryMode = deliveryMode;
+				producer.Send(session.CreateTextMessage("Test"));
+
+				// Message should not be delivered until commit.
+				Thread.Sleep(1000);
+				Assert.IsNull(consumer.ReceiveNoWait());
+				session.Commit();
+
+				// Make sure only 1 message was delivered.
+				IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(message);
+				Assert.IsFalse(message.NMSRedelivered);
+				Assert.IsNull(consumer.ReceiveNoWait());
+
+				// Message should be redelivered is rollback is used.
+				session.Rollback();
+
+				// Make sure only 1 message was delivered.
+				message = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+				Assert.IsNotNull(message);
+				Assert.IsTrue(message.NMSRedelivered);
+				Assert.IsNull(consumer.ReceiveNoWait());
+
+				// If we commit now, the message should not be redelivered.
+				session.Commit();
+				Thread.Sleep(1000);
+				Assert.IsNull(consumer.ReceiveNoWait());
+			}
+		}
+
+		//[Test]
+		public virtual void TestAckedMessageAreConsumed(string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageProducer producer = session.CreateProducer(destination);
+				producer.Send(session.CreateTextMessage("Hello"));
+
+				// Consume the message...
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+				IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(msg);
+				msg.Acknowledge();
+
+				// Reset the session.
+				session.Close();
+				session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+				// Attempt to Consume the message...
+				consumer = session.CreateConsumer(destination);
+				msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNull(msg);
+
+				session.Close();
+			}
+		}
+
+		//[Test]
+		public virtual void TestLastMessageAcked(string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageProducer producer = session.CreateProducer(destination);
+				producer.Send(session.CreateTextMessage("Hello"));
+				producer.Send(session.CreateTextMessage("Hello2"));
+				producer.Send(session.CreateTextMessage("Hello3"));
+
+				// Consume the message...
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+				IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(msg);
+				msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(msg);
+				msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(msg);
+				msg.Acknowledge();
+
+				// Reset the session.
+				session.Close();
+				session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+				// Attempt to Consume the message...
+				consumer = session.CreateConsumer(destination);
+				msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNull(msg);
+
+				session.Close();
+			}
+		}
+
+		//[Test]
+		public virtual void TestUnAckedMessageAreNotConsumedOnSessionClose(string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageProducer producer = session.CreateProducer(destination);
+				producer.Send(session.CreateTextMessage("Hello"));
+
+				// Consume the message...
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+				IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNotNull(msg);
+				// Don't ack the message.
+
+				// Reset the session.  This should cause the unacknowledged message to be re-delivered.
+				session.Close();
+				session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+				// Attempt to Consume the message...
+				consumer = session.CreateConsumer(destination);
+				msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+				Assert.IsNotNull(msg);
+				msg.Acknowledge();
+
+				session.Close();
+			}
+		}
+
+		//[Test]
+		public virtual void TestAsyncAckedMessageAreConsumed(string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageProducer producer = session.CreateProducer(destination);
+				producer.Send(session.CreateTextMessage("Hello"));
+
+				// Consume the message...
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+				consumer.Listener += new MessageListener(OnMessage);
+
+				Thread.Sleep(5000);
+
+				// Reset the session.
+				session.Close();
+
+				session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+				// Attempt to Consume the message...
+				consumer = session.CreateConsumer(destination);
+				IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+				Assert.IsNull(msg);
+
+				session.Close();
+			}
+		}
+
+		//[Test]
+		public virtual void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose(string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				// don't aknowledge message on onMessage() call
+				dontAck = true;
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageProducer producer = session.CreateProducer(destination);
+				producer.Send(session.CreateTextMessage("Hello"));
+
+				// Consume the message...
+				using(IMessageConsumer consumer = session.CreateConsumer(destination))
+				{
+					consumer.Listener += new MessageListener(OnMessage);
+					// Don't ack the message.
+				}
+
+				// Reset the session. This should cause the Unacked message to be
+				// redelivered.
+				session.Close();
+
+				Thread.Sleep(5000);
+				session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				// Attempt to Consume the message...
+				using(IMessageConsumer consumer = session.CreateConsumer(destination))
+				{
+					IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+					Assert.IsNotNull(msg);
+					msg.Acknowledge();
+				}
+
+				session.Close();
+			}
+		}
+
+		//[Test]
+		public virtual void TestAddRemoveAsnycMessageListener(DestinationType destType, string testDestRef)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+
+				ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+				IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+				IMessageConsumer consumer = session.CreateConsumer(destination);
+
+				consumer.Listener += OnMessage;
+				consumer.Listener -= OnMessage;
+				consumer.Listener += OnMessage;
+
+				consumer.Close();
+			}
+		}
+
+		public void OnMessage(IMessage message)
+		{
+			Assert.IsNotNull(message);
+
+			if(!dontAck)
+			{
+				try
+				{
+					message.Acknowledge();
+				}
+				catch(Exception)
+				{
+				}
+			}
+		}
+
+		//[Test]
+		public virtual void TestReceiveNoWait(
+            string testDestRef,
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode,
+			//[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+			MsgDeliveryMode deliveryMode)
+		{
+			const int RETRIES = 20;
+
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(ackMode))
+				{
+				    IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+
+					using(IMessageProducer producer = session.CreateProducer(destination))
+					{
+						producer.DeliveryMode = deliveryMode;
+						ITextMessage message = session.CreateTextMessage("TEST");
+						producer.Send(message);
+
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+					}
+
+					using(IMessageConsumer consumer = session.CreateConsumer(destination))
+					{
+						IMessage message = null;
+
+						for(int i = 0; i < RETRIES && message == null; ++i)
+						{
+							message = consumer.ReceiveNoWait();
+							Thread.Sleep(100);
+						}
+
+						Assert.IsNotNull(message);
+						message.Acknowledge();
+
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+					}
+				}
+			}
+		}
+
+#endif
+
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+	//[TestFixture]
+	public class DurableTest : NMSTest
+	{
+		protected static string DURABLE_SELECTOR = "2 > 1";
+
+		protected string TEST_CLIENT_AND_CONSUMER_ID;
+		protected string SEND_CLIENT_ID;
+
+		protected DurableTest(NMSTestSupport testSupport)
+			: base(testSupport)
+		{
+		}
+
+		//[SetUp]
+		public override void SetUp()
+		{
+			base.SetUp();
+			
+			TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId();
+			SEND_CLIENT_ID = GetTestClientId();
+		}
+
+		//[Test]
+		public virtual void TestSendWhileClosed(
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode, string testTopicRef)
+		{
+			try
+			{				
+		        using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+				{
+			        connection.Start();
+
+					using(ISession session = connection.CreateSession(ackMode))
+					{
+						ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
+						IMessageProducer producer = session.CreateProducer(topic);
+
+						producer.DeliveryMode = MsgDeliveryMode.Persistent;
+										
+				        ISession consumeSession = connection.CreateSession(ackMode);
+				        IMessageConsumer consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+				        Thread.Sleep(1000);
+				        consumer.Dispose();
+						consumer = null;
+				        
+						ITextMessage message = session.CreateTextMessage("DurableTest-TestSendWhileClosed");
+				        message.Properties.SetString("test", "test");
+				        message.NMSType = "test";
+				        producer.Send(message);
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+										        
+				        Thread.Sleep(1000);
+						consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+				        ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
+						msg.Acknowledge();
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							consumeSession.Commit();
+						}
+						
+						Assert.IsNotNull(msg);
+				        Assert.AreEqual(msg.Text, "DurableTest-TestSendWhileClosed");
+				        Assert.AreEqual(msg.NMSType, "test");
+				        Assert.AreEqual(msg.Properties.GetString("test"), "test");
+					}
+				}
+			}
+			catch(Exception ex)
+			{
+				Assert.Fail(ex.Message);
+			}
+			finally
+			{
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+
+				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+			}			
+	    }		
+		
+		//[Test]
+		public void TestDurableConsumerSelectorChange(
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode, string testTopicRef)
+		{
+			try
+			{
+				using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+				{
+					connection.Start();
+					using(ISession session = connection.CreateSession(ackMode))
+					{
+						ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
+						IMessageProducer producer = session.CreateProducer(topic);
+						IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='red'", false);
+
+						producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+						// Send the messages
+						ITextMessage sendMessage = session.CreateTextMessage("1st");
+						sendMessage.Properties["color"] = "red";
+						producer.Send(sendMessage);
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+
+						ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+						Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
+						Assert.AreEqual("1st", receiveMsg.Text);
+						Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+						receiveMsg.Acknowledge();
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+
+						// Change the subscription, allowing some time for the Broker to purge the
+						// consumers resources.
+						consumer.Dispose();
+                        Thread.Sleep(1000);
+						
+						consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='blue'", false);
+
+						sendMessage = session.CreateTextMessage("2nd");
+						sendMessage.Properties["color"] = "red";
+						producer.Send(sendMessage);
+						sendMessage = session.CreateTextMessage("3rd");
+						sendMessage.Properties["color"] = "blue";
+						producer.Send(sendMessage);
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+
+						// Selector should skip the 2nd message.
+						receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+						Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
+						Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
+						Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+						receiveMsg.Acknowledge();
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+
+						// Make sure there are no pending messages.
+						Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
+					}
+				}
+			}
+			catch(Exception ex)
+			{
+				Assert.Fail(ex.Message);
+			}
+			finally
+			{
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+
+				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+			}
+		}
+
+		//[Test]
+		public void TestDurableConsumer(
+			//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+			//	AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+			AcknowledgementMode ackMode, string testDurableTopicName)
+		{
+			try
+			{
+				RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+				RunTestDurableConsumer(testDurableTopicName, ackMode);
+				if(AcknowledgementMode.Transactional == ackMode)
+				{
+					RunTestDurableConsumer(testDurableTopicName, ackMode);
+				}
+			}
+			finally
+			{
+                // Pause to allow Stomp to unregister at the broker.
+                Thread.Sleep(500);
+				
+				UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+			}
+		}
+
+		protected void RunTestDurableConsumer(string topicName, AcknowledgementMode ackMode)
+		{
+			SendDurableMessage(topicName);
+			SendDurableMessage(topicName);
+
+			using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(ackMode))
+				{
+					ITopic topic = SessionUtil.GetTopic(session, topicName);
+					using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false))
+					{
+						IMessage msg = consumer.Receive(receiveTimeout);
+						Assert.IsNotNull(msg, "Did not receive first durable message.");
+						msg.Acknowledge();
+
+						msg = consumer.Receive(receiveTimeout);
+						Assert.IsNotNull(msg, "Did not receive second durable message.");
+						msg.Acknowledge();
+
+						if(AcknowledgementMode.Transactional == ackMode)
+						{
+							session.Commit();
+						}
+					}
+				}
+			}
+		}
+
+		protected void SendDurableMessage(string topicName)
+		{
+			using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession())
+				{
+					ITopic topic = SessionUtil.GetTopic(session, topicName);
+					using(IMessageProducer producer = session.CreateProducer(topic))
+					{
+						ITextMessage message = session.CreateTextMessage("Durable Hello");
+
+						producer.DeliveryMode = MsgDeliveryMode.Persistent;
+						producer.Send(message);
+					}
+				}
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+	[TestFixture]
+	public class EndianBinaryReaderTest
+	{
+		public void readString16Helper(byte[] input, char[] expect)
+		{
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			char[] result = reader.ReadString16().ToCharArray();
+
+			for(int i = 0; i < expect.Length; ++i)
+			{
+				Assert.AreEqual(expect[i], result[i]);
+			}
+		}
+
+		[Test]
+		public void testReadString16_1byteUTF8encoding()
+		{
+			// Test data with 1-byte UTF8 encoding.
+			char[] expect = { '\u0000', '\u000B', '\u0048', '\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', '\u006C', '\u0064' };
+			byte[] input = { 0x00, 0x0E, 0xC0, 0x80, 0x0B, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+			readString16Helper(input, expect);
+		}
+
+		[Test]
+		public void testReadString16_2byteUTF8encoding()
+		{
+			// Test data with 2-byte UT8 encoding.
+			char[] expect = { '\u0000', '\u00C2', '\u00A9', '\u00C3', '\u00A6' };
+			byte[] input = { 0x00, 0x0A, 0xC0, 0x80, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+			readString16Helper(input, expect);
+		}
+
+		[Test]
+		public void testReadString16_1byteAnd2byteEmbeddedNULLs()
+		{
+			// Test data with 1-byte and 2-byte encoding with embedded NULL's.
+			char[] expect = { '\u0000', '\u0004', '\u00C2', '\u00A9', '\u00C3', '\u0000', '\u00A6' };
+			byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+			readString16Helper(input, expect);
+		}
+
+		[Test]
+		[ExpectedException(typeof(IOException))]
+		public void testReadString16_UTF8Missing2ndByte()
+		{
+			// Test with bad UTF-8 encoding, missing 2nd byte of two byte value
+			byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			reader.ReadString16();
+		}
+
+		[Test]
+		[ExpectedException(typeof(IOException))]
+		public void testReadString16_3byteEncodingMissingLastByte()
+		{
+			// Test with three byte encode that's missing a last byte.
+			byte[] input = { 0x00, 0x02, 0xE8, 0xA8 };
+
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			reader.ReadString16();
+		}
+
+		public void readString32Helper(byte[] input, char[] expect)
+		{
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			char[] result = reader.ReadString32().ToCharArray();
+
+			for(int i = 0; i < expect.Length; ++i)
+			{
+				Assert.AreEqual(expect[i], result[i]);
+			}
+		}
+
+		[Test]
+		public void testReadString32_1byteUTF8encoding()
+		{
+			// Test data with 1-byte UTF8 encoding.
+			char[] expect = { '\u0000', '\u000B', '\u0048', '\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', '\u006C', '\u0064' };
+			byte[] input = { 0x00, 0x00, 0x00, 0x0E, 0xC0, 0x80, 0x0B, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+			readString32Helper(input, expect);
+		}
+
+		[Test]
+		public void testReadString32_2byteUTF8encoding()
+		{
+			// Test data with 2-byte UT8 encoding.
+			char[] expect = { '\u0000', '\u00C2', '\u00A9', '\u00C3', '\u00A6' };
+			byte[] input = { 0x00, 0x00, 0x00, 0x0A, 0xC0, 0x80, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+			readString32Helper(input, expect);
+		}
+
+		[Test]
+		public void testReadString32_1byteAnd2byteEmbeddedNULLs()
+		{
+			// Test data with 1-byte and 2-byte encoding with embedded NULL's.
+			char[] expect = { '\u0000', '\u0004', '\u00C2', '\u00A9', '\u00C3', '\u0000', '\u00A6' };
+			byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+			readString32Helper(input, expect);
+		}
+
+		[Test]
+		[ExpectedException(typeof(IOException))]
+		public void testReadString32_UTF8Missing2ndByte()
+		{
+			// Test with bad UTF-8 encoding, missing 2nd byte of two byte value
+			byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			reader.ReadString32();
+		}
+
+		[Test]
+		[ExpectedException(typeof(IOException))]
+		public void testReadString32_3byteEncodingMissingLastByte()
+		{
+			// Test with three byte encode that's missing a last byte.
+			byte[] input = { 0x00, 0x00, 0x00, 0x02, 0xE8, 0xA8 };
+
+			MemoryStream stream = new MemoryStream(input);
+			EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+			reader.ReadString32();
+		}
+	}
+}