You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jg...@apache.org on 2016/09/10 23:09:21 UTC
svn commit: r1760211 [9/12] - in
/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk: ./ src/main/csharp/
src/main/csharp/Readers/ src/main/csharp/Selector/ src/test/csharp/
src/test/csharp/Commands/
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/StreamMessage.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,901 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Collections;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+ public class StreamMessage : Message, IStreamMessage
+ {
+ private EndianBinaryReader dataIn = null;
+ private EndianBinaryWriter dataOut = null;
+ private MemoryStream byteBuffer = null;
+ private int bytesRemaining = -1;
+
+ public bool ReadBoolean()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Boolean.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a bool");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Boolean type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Byte.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a byte");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Byte type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar();
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a char");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Char type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int16.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a short");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int16 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int32.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a int");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int32 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Int64.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a long");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Int64 type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Single.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a float");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Single type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReading();
+
+ try
+ {
+ long startingPos = this.byteBuffer.Position;
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return Single.Parse(this.dataIn.ReadString16());
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new NMSException("Cannot convert Null type to a double");
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a Double type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReading();
+
+ long startingPos = this.byteBuffer.Position;
+
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BIG_STRING_TYPE)
+ {
+ return this.dataIn.ReadString32();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return this.dataIn.ReadString16();
+ }
+ else if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64().ToString();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32().ToString();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16().ToString();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle().ToString();
+ }
+ else if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble().ToString();
+ }
+ else if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar().ToString();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte().ToString();
+ }
+ else if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean().ToString();
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a known type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes(byte[] value)
+ {
+ InitializeReading();
+
+ if(value == null)
+ {
+ throw new NullReferenceException("Passed Byte Array is null");
+ }
+
+ try
+ {
+ if(this.bytesRemaining == -1)
+ {
+ long startingPos = this.byteBuffer.Position;
+ byte type = this.dataIn.ReadByte();
+
+ if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Not a byte array");
+ }
+
+ this.bytesRemaining = this.dataIn.ReadInt32();
+ }
+ else if(this.bytesRemaining == 0)
+ {
+ this.bytesRemaining = -1;
+ return -1;
+ }
+
+ if(value.Length <= this.bytesRemaining)
+ {
+ // small buffer
+ this.bytesRemaining -= value.Length;
+ this.dataIn.Read(value, 0, value.Length);
+ return value.Length;
+ }
+ else
+ {
+ // big buffer
+ int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
+ this.bytesRemaining = 0;
+ return rc;
+ }
+ }
+ catch(EndOfStreamException ex)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(ex);
+ }
+ catch(IOException ex)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(ex);
+ }
+ }
+
+ public Object ReadObject()
+ {
+ InitializeReading();
+
+ long startingPos = this.byteBuffer.Position;
+
+ try
+ {
+ int type = this.dataIn.ReadByte();
+
+ if(type == PrimitiveMap.BIG_STRING_TYPE)
+ {
+ return this.dataIn.ReadString32();
+ }
+ else if(type == PrimitiveMap.STRING_TYPE)
+ {
+ return this.dataIn.ReadString16();
+ }
+ else if(type == PrimitiveMap.LONG_TYPE)
+ {
+ return this.dataIn.ReadInt64();
+ }
+ else if(type == PrimitiveMap.INTEGER_TYPE)
+ {
+ return this.dataIn.ReadInt32();
+ }
+ else if(type == PrimitiveMap.SHORT_TYPE)
+ {
+ return this.dataIn.ReadInt16();
+ }
+ else if(type == PrimitiveMap.FLOAT_TYPE)
+ {
+ return this.dataIn.ReadSingle();
+ }
+ else if(type == PrimitiveMap.DOUBLE_TYPE)
+ {
+ return this.dataIn.ReadDouble();
+ }
+ else if(type == PrimitiveMap.CHAR_TYPE)
+ {
+ return this.dataIn.ReadChar();
+ }
+ else if(type == PrimitiveMap.BYTE_TYPE)
+ {
+ return this.dataIn.ReadByte();
+ }
+ else if(type == PrimitiveMap.BOOLEAN_TYPE)
+ {
+ return this.dataIn.ReadBoolean();
+ }
+ else if(type == PrimitiveMap.BYTE_ARRAY_TYPE)
+ {
+ int length = this.dataIn.ReadInt32();
+ byte[] data = new byte[length];
+ this.dataIn.Read(data, 0, length);
+ return data;
+ }
+ else if(type == PrimitiveMap.NULL)
+ {
+ return null;
+ }
+ else
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw new MessageFormatException("Value is not a known type.");
+ }
+ }
+ catch(FormatException e)
+ {
+ this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean(bool value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteByte(byte value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes(byte[] value)
+ {
+ InitializeWriting();
+ this.WriteBytes(value, 0, value.Length);
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
+ this.dataOut.Write((int) length);
+ this.dataOut.Write(value, offset, length);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteChar(char value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt16(short value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt32(int value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt64(long value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.LONG_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteSingle(float value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteDouble(double value)
+ {
+ InitializeWriting();
+ try
+ {
+ this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
+ this.dataOut.Write(value);
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteString(string value)
+ {
+ InitializeWriting();
+ try
+ {
+ if( value.Length > 8192 )
+ {
+ this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
+ this.dataOut.WriteString32(value);
+ }
+ else
+ {
+ this.dataOut.Write(PrimitiveMap.STRING_TYPE);
+ this.dataOut.WriteString16(value);
+ }
+ }
+ catch(IOException e)
+ {
+ NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject(Object value)
+ {
+ InitializeWriting();
+ if( value is System.Byte )
+ {
+ this.WriteByte( (byte) value );
+ }
+ else if( value is Char )
+ {
+ this.WriteChar( (char) value );
+ }
+ else if( value is Boolean )
+ {
+ this.WriteBoolean( (bool) value );
+ }
+ else if( value is Int16 )
+ {
+ this.WriteInt16( (short) value );
+ }
+ else if( value is Int32 )
+ {
+ this.WriteInt32( (int) value );
+ }
+ else if( value is Int64 )
+ {
+ this.WriteInt64( (long) value );
+ }
+ else if( value is Single )
+ {
+ this.WriteSingle( (float) value );
+ }
+ else if( value is Double )
+ {
+ this.WriteDouble( (double) value );
+ }
+ else if( value is byte[] )
+ {
+ this.WriteBytes( (byte[]) value );
+ }
+ else if( value is String )
+ {
+ this.WriteString( (string) value );
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+ }
+ }
+
+ public override Object Clone()
+ {
+ StoreContent();
+ return base.Clone();
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.byteBuffer = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.bytesRemaining = -1;
+ }
+
+ public void Reset()
+ {
+ StoreContent();
+ this.dataIn = null;
+ this.dataOut = null;
+ this.byteBuffer = null;
+ this.bytesRemaining = -1;
+ this.ReadOnlyBody = true;
+ }
+
+ private void InitializeReading()
+ {
+ FailIfWriteOnlyBody();
+ if(this.dataIn == null)
+ {
+ this.byteBuffer = new MemoryStream(this.Content, false);
+ this.dataIn = new EndianBinaryReader(this.byteBuffer);
+ }
+ }
+
+ private void InitializeWriting()
+ {
+ FailIfReadOnlyBody();
+ if(this.dataOut == null)
+ {
+ this.byteBuffer = new MemoryStream();
+ this.dataOut = new EndianBinaryWriter(this.byteBuffer);
+ }
+ }
+
+ private void StoreContent()
+ {
+ if( dataOut != null)
+ {
+ dataOut.Close();
+
+ this.Content = byteBuffer.ToArray();
+ this.dataOut = null;
+ this.byteBuffer = null;
+ }
+ }
+
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempDestination.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Commands
+{
+ public abstract class TempDestination : Destination
+ {
+ /// <summary>
+ /// Method CreateDestination
+ /// </summary>
+ /// <returns>An Destination</returns>
+ /// <param name="name">A String</param>
+ public override Destination CreateDestination(String name)
+ {
+ return null;
+ }
+
+ abstract override public DestinationType DestinationType
+ {
+ get;
+ }
+
+ public TempDestination()
+ : base()
+ {
+ }
+
+ public TempDestination(String name)
+ : base(name)
+ {
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ TempDestination o = (TempDestination) base.Clone();
+
+ // Now do the deep work required
+ // If any new variables are added then this routine will
+ // likely need updating
+
+ return o;
+ }
+
+ public void Delete()
+ {
+ throw new NotSupportedException("Stomp Cannot Delete Temporary Destinations");
+ }
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempQueue.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+ /// <summary>
+ /// A Temporary Queue
+ /// </summary>
+ public class TempQueue : TempDestination, ITemporaryQueue
+ {
+ public TempQueue()
+ : base()
+ {
+ }
+
+ public TempQueue(String name)
+ : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.TemporaryQueue;
+ }
+ }
+
+ public String QueueName
+ {
+ get { return PhysicalName; }
+ }
+
+ public String GetQueueName()
+ {
+ return PhysicalName;
+ }
+
+ public override int GetDestinationType()
+ {
+ return TEMPORARY_QUEUE;
+ }
+
+ public override Destination CreateDestination(String name)
+ {
+ return new TempQueue(name);
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ TempQueue o = (TempQueue) base.Clone();
+
+ // Now do the deep work required
+ // If any new variables are added then this routine will
+ // likely need updating
+
+ return o;
+ }
+
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TempTopic.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+ /// <summary>
+ /// A Temporary Topic
+ /// </summary>
+ public class TempTopic : TempDestination, ITemporaryTopic
+ {
+ public TempTopic() : base()
+ {
+ }
+
+ public TempTopic(String name) : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get { return DestinationType.TemporaryTopic; }
+ }
+
+ public String TopicName
+ {
+ get { return PhysicalName; }
+ }
+
+ public String GetTopicName()
+ {
+ return PhysicalName;
+ }
+
+ public override int GetDestinationType()
+ {
+ return TEMPORARY_TOPIC;
+ }
+
+ public override Destination CreateDestination(String name)
+ {
+ return new TempTopic(name);
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ TempTopic o = (TempTopic) base.Clone();
+
+ // Now do the deep work required
+ // If any new variables are added then this routine will
+ // likely need updating
+
+ return o;
+ }
+
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/TextMessage.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+ public class TextMessage : Message, ITextMessage
+ {
+ private String text = null;
+
+ public TextMessage()
+ {
+ }
+
+ public TextMessage(String text)
+ {
+ this.Text = text;
+ }
+
+ public override string ToString()
+ {
+ string text = this.Text;
+
+ if(text != null && text.Length > 63)
+ {
+ text = text.Substring(0, 45) + "..." + text.Substring(text.Length - 12);
+ }
+ return base.ToString() + " Text = " + (text ?? "null");
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.text = null;
+ }
+
+ // Properties
+
+ public string Text
+ {
+ get { return this.text; }
+ set
+ {
+ FailIfReadOnlyBody();
+ this.text = value;
+ this.Content = null;
+ }
+ }
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/Commands/Topic.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+
+ /// <summary>
+ /// Summary description for Topic.
+ /// </summary>
+ public class Topic : Destination, ITopic
+ {
+ public Topic() : base()
+ {
+ }
+
+ public Topic(String name) : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.Topic;
+ }
+ }
+
+ public String TopicName
+ {
+ get { return PhysicalName; }
+ }
+
+ public override int GetDestinationType()
+ {
+ return TOPIC;
+ }
+
+ public override Destination CreateDestination(String name)
+ {
+ return new Topic(name);
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ Topic o = (Topic) base.Clone();
+
+ // Now do the deep work required
+ // If any new variables are added then this routine will
+ // likely need updating
+
+ return o;
+ }
+ }
+}
+
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConnectionTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class ConnectionTest : NMSTest
+ {
+ IConnection startedConnection = null;
+ IConnection stoppedConnection = null;
+
+ protected ConnectionTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+ //[SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ startedConnection = CreateConnection(null);
+ startedConnection.Start();
+ stoppedConnection = CreateConnection(null);
+ }
+
+ //[TearDown]
+ public override void TearDown()
+ {
+ startedConnection.Close();
+ stoppedConnection.Close();
+
+ base.TearDown();
+ }
+
+ /// <summary>
+ /// Verify that it is possible to create multiple connections to the broker.
+ /// There was a bug in the connection factory which set the clientId member which made
+ /// it impossible to create an additional connection.
+ /// </summary>
+ //[Test]
+ public virtual void TestTwoConnections()
+ {
+ using(IConnection connection1 = CreateConnection(null))
+ {
+ connection1.Start();
+ using(IConnection connection2 = CreateConnection(null))
+ {
+ // with the bug present we'll get an exception in connection2.start()
+ connection2.Start();
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestCreateAndDisposeWithConsumer(
+ //[Values(true, false)]
+ bool disposeConsumer, string testDestRef)
+ {
+ using(IConnection connection = CreateConnection("DisposalTestConnection"))
+ {
+ connection.Start();
+
+ using(ISession session = connection.CreateSession())
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ connection.Stop();
+ if(disposeConsumer)
+ {
+ consumer.Dispose();
+ }
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestCreateAndDisposeWithProducer(
+ //[Values(true, false)]
+ bool disposeProducer, string testDestRef)
+ {
+ using(IConnection connection = CreateConnection("DisposalTestConnection"))
+ {
+ connection.Start();
+
+ using(ISession session = connection.CreateSession())
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ connection.Stop();
+ if(disposeProducer)
+ {
+ producer.Dispose();
+ }
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestStartAfterSend(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode,
+ string testDestRef)
+ {
+ using(IConnection connection = CreateConnection(GetTestClientId()))
+ {
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ SendMessages(session, destination, deliveryMode, 1);
+
+ // Start the conncection after the message was sent.
+ connection.Start();
+
+ // Make sure only 1 message was delivered.
+ Assert.IsNotNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+ Assert.IsNull(consumer.ReceiveNoWait());
+ }
+ }
+
+ /// <summary>
+ /// Tests if the consumer receives the messages that were sent before the
+ /// connection was started.
+ /// </summary>
+ //[Test]
+ public virtual void TestStoppedConsumerHoldsMessagesTillStarted(string testDestRef)
+ {
+ ISession startedSession = startedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ISession stoppedSession = stoppedConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+ // Setup the consumers.
+ IDestination destination = GetClearDestinationByNodeReference(startedSession, testDestRef);
+ IMessageConsumer startedConsumer = startedSession.CreateConsumer(destination);
+ IMessageConsumer stoppedConsumer = stoppedSession.CreateConsumer(destination);
+
+ // Send the message.
+ IMessageProducer producer = startedSession.CreateProducer(destination);
+ ITextMessage message = startedSession.CreateTextMessage("Hello");
+ producer.Send(message);
+
+ // Test the assertions.
+ IMessage m;
+ if(destination.IsTopic)
+ {
+ m = startedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ }
+
+ m = stoppedConsumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNull(m);
+
+ stoppedConnection.Start();
+ m = stoppedConsumer.Receive(TimeSpan.FromMilliseconds(5000));
+ Assert.IsNotNull(m);
+
+ startedSession.Close();
+ stoppedSession.Close();
+ }
+
+ /// <summary>
+ /// Tests if the consumer is able to receive messages even when the
+ /// connecction restarts multiple times.
+ /// </summary>
+ //[Test]
+ public virtual void TestMultipleConnectionStops(string testDestRef)
+ {
+ TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+ stoppedConnection.Stop();
+ TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+ stoppedConnection.Stop();
+ TestStoppedConsumerHoldsMessagesTillStarted(testDestRef);
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/ConsumerTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class ConsumerTest : NMSTest
+ {
+ protected const int COUNT = 25;
+ protected const string VALUE_NAME = "value";
+
+ private bool dontAck;
+
+ protected ConsumerTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
+#if !NETCF
+ //[Test]
+ public virtual void TestNoTimeoutConsumer(
+ string testDestRef,
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode)
+ {
+ // Launch a thread to perform IMessageConsumer.Receive().
+ // If it doesn't fail in less than three seconds, no exception was thrown.
+ Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ using(this.timeoutConsumer = session.CreateConsumer(destination))
+ {
+ receiveThread.Start();
+ if(receiveThread.Join(3000))
+ {
+ Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed.");
+ }
+ else
+ {
+ try
+ {
+ // Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+ receiveThread.Interrupt();
+ // MSMQ MessageQueue.Receive is interrupted neither by Thread.Interrupt, nor by
+ // Thread.Abort. Send a dummy message to stop the background thread.
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateMessage());
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected IMessageConsumer timeoutConsumer;
+
+ protected void TimeoutConsumerThreadProc()
+ {
+ try
+ {
+ timeoutConsumer.Receive();
+ }
+ catch(ArgumentOutOfRangeException e)
+ {
+ // The test failed. We will know because the timeout will expire inside TestNoTimeoutConsumer().
+ Assert.Fail("Test failed with exception: " + e.Message);
+ }
+ catch(ThreadInterruptedException)
+ {
+ // The test succeeded! We were still blocked when we were interrupted.
+ }
+ catch(Exception e)
+ {
+ // Some other exception occurred.
+ Assert.Fail("Test failed with exception: " + e.Message);
+ }
+ }
+
+ //[Test]
+ public virtual void TestSyncReceiveConsumerClose(
+ string testDestRef,
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode)
+ {
+ // Launch a thread to perform IMessageConsumer.Receive().
+ // If it doesn't fail in less than three seconds, no exception was thrown.
+ Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+ using (IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ using (ISession session = connection.CreateSession(ackMode))
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ using (this.timeoutConsumer = session.CreateConsumer(destination))
+ {
+ receiveThread.Start();
+ if (receiveThread.Join(3000))
+ {
+ Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed.");
+ }
+ else
+ {
+ // Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+ this.timeoutConsumer.Close();
+ receiveThread.Join(10000);
+ if (receiveThread.IsAlive)
+ {
+ Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it.");
+ try
+ {
+ // Kill the thread - otherwise it'll sit in Receive() until a message arrives.
+ receiveThread.Interrupt();
+ // MSMQ MessageQueue.Receive is interrupted neither by Thread.Interrupt, nor by
+ // Thread.Abort. Send a dummy message to stop the background thread.
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateMessage());
+ }
+ catch
+ {
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ internal class ThreadArg
+ {
+ internal IConnection connection = null;
+ internal ISession session = null;
+ internal IDestination destination = null;
+ }
+
+ protected void DelayedProducerThreadProc(Object arg)
+ {
+ try
+ {
+ ThreadArg args = arg as ThreadArg;
+
+ using(ISession session = args.connection.CreateSession())
+ {
+ using(IMessageProducer producer = session.CreateProducer(args.destination))
+ {
+ // Give the consumer time to enter the receive.
+ Thread.Sleep(5000);
+
+ producer.Send(args.session.CreateTextMessage("Hello World"));
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ // Some other exception occurred.
+ Assert.Fail("Test failed with exception: " + e.Message);
+ }
+ }
+
+ //[Test]
+ public virtual void TestDoChangeSentMessage(
+ string testDestRef,
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode,
+ //[Values(true, false)]
+ bool doClear)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ {
+ IMessageProducer producer = session.CreateProducer(destination);
+ ITextMessage message = session.CreateTextMessage();
+
+ string prefix = "ConsumerTest - TestDoChangeSentMessage: ";
+
+ for(int i = 0; i < COUNT; i++)
+ {
+ message.Properties[VALUE_NAME] = i;
+ message.Text = prefix + Convert.ToString(i);
+
+ producer.Send(message);
+
+ if(doClear)
+ {
+ message.ClearBody();
+ message.ClearProperties();
+ }
+ }
+
+ if(ackMode == AcknowledgementMode.Transactional)
+ {
+ session.Commit();
+ }
+
+ for(int i = 0; i < COUNT; i++)
+ {
+ ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage;
+ Assert.AreEqual(msg.Text, prefix + Convert.ToString(i));
+ Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i);
+ }
+
+ if(ackMode == AcknowledgementMode.Transactional)
+ {
+ session.Commit();
+ }
+
+ }
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestConsumerReceiveBeforeMessageDispatched(
+ string testDestRef,
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode)
+ {
+ // Launch a thread to perform a delayed send.
+ Thread sendThread = new Thread(DelayedProducerThreadProc);
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ {
+ ThreadArg arg = new ThreadArg();
+
+ arg.connection = connection;
+ arg.session = session;
+ arg.destination = destination;
+
+ sendThread.Start(arg);
+ IMessage message = consumer.Receive(TimeSpan.FromMinutes(1));
+ Assert.IsNotNull(message);
+ }
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestDontStart(
+ string testDestRef,
+ //[Values(MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ ISession session = connection.CreateSession();
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // Send the messages
+ SendMessages(session, destination, deliveryMode, 1);
+
+ // Make sure no messages were delivered.
+ Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+ }
+ }
+
+ //[Test]
+ public virtual void TestSendReceiveTransacted(
+ string testDestRef,
+ //[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+ MsgDeliveryMode deliveryMode)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ // Send a message to the broker.
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageProducer producer = session.CreateProducer(destination);
+
+ producer.DeliveryMode = deliveryMode;
+ producer.Send(session.CreateTextMessage("Test"));
+
+ // Message should not be delivered until commit.
+ Thread.Sleep(1000);
+ Assert.IsNull(consumer.ReceiveNoWait());
+ session.Commit();
+
+ // Make sure only 1 message was delivered.
+ IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(message);
+ Assert.IsFalse(message.NMSRedelivered);
+ Assert.IsNull(consumer.ReceiveNoWait());
+
+ // Message should be redelivered is rollback is used.
+ session.Rollback();
+
+ // Make sure only 1 message was delivered.
+ message = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+ Assert.IsNotNull(message);
+ Assert.IsTrue(message.NMSRedelivered);
+ Assert.IsNull(consumer.ReceiveNoWait());
+
+ // If we commit now, the message should not be redelivered.
+ session.Commit();
+ Thread.Sleep(1000);
+ Assert.IsNull(consumer.ReceiveNoWait());
+ }
+ }
+
+ //[Test]
+ public virtual void TestAckedMessageAreConsumed(string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(msg);
+ msg.Acknowledge();
+
+ // Reset the session.
+ session.Close();
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(destination);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNull(msg);
+
+ session.Close();
+ }
+ }
+
+ //[Test]
+ public virtual void TestLastMessageAcked(string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateTextMessage("Hello"));
+ producer.Send(session.CreateTextMessage("Hello2"));
+ producer.Send(session.CreateTextMessage("Hello3"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(msg);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(msg);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(msg);
+ msg.Acknowledge();
+
+ // Reset the session.
+ session.Close();
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(destination);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNull(msg);
+
+ session.Close();
+ }
+ }
+
+ //[Test]
+ public virtual void TestUnAckedMessageAreNotConsumedOnSessionClose(string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(msg);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the unacknowledged message to be re-delivered.
+ session.Close();
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(destination);
+ msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+ Assert.IsNotNull(msg);
+ msg.Acknowledge();
+
+ session.Close();
+ }
+ }
+
+ //[Test]
+ public virtual void TestAsyncAckedMessageAreConsumed(string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ consumer.Listener += new MessageListener(OnMessage);
+
+ Thread.Sleep(5000);
+
+ // Reset the session.
+ session.Close();
+
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+ // Attempt to Consume the message...
+ consumer = session.CreateConsumer(destination);
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNull(msg);
+
+ session.Close();
+ }
+ }
+
+ //[Test]
+ public virtual void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose(string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ // don't aknowledge message on onMessage() call
+ dontAck = true;
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(session.CreateTextMessage("Hello"));
+
+ // Consume the message...
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ {
+ consumer.Listener += new MessageListener(OnMessage);
+ // Don't ack the message.
+ }
+
+ // Reset the session. This should cause the Unacked message to be
+ // redelivered.
+ session.Close();
+
+ Thread.Sleep(5000);
+ session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ // Attempt to Consume the message...
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ {
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+ Assert.IsNotNull(msg);
+ msg.Acknowledge();
+ }
+
+ session.Close();
+ }
+ }
+
+ //[Test]
+ public virtual void TestAddRemoveAsnycMessageListener(DestinationType destType, string testDestRef)
+ {
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+ consumer.Listener -= OnMessage;
+ consumer.Listener += OnMessage;
+
+ consumer.Close();
+ }
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ Assert.IsNotNull(message);
+
+ if(!dontAck)
+ {
+ try
+ {
+ message.Acknowledge();
+ }
+ catch(Exception)
+ {
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestReceiveNoWait(
+ string testDestRef,
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode,
+ //[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+ MsgDeliveryMode deliveryMode)
+ {
+ const int RETRIES = 20;
+
+ using(IConnection connection = CreateConnection())
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ IDestination destination = GetClearDestinationByNodeReference(session, testDestRef);
+
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ ITextMessage message = session.CreateTextMessage("TEST");
+ producer.Send(message);
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ {
+ IMessage message = null;
+
+ for(int i = 0; i < RETRIES && message == null; ++i)
+ {
+ message = consumer.ReceiveNoWait();
+ Thread.Sleep(100);
+ }
+
+ Assert.IsNotNull(message);
+ message.Acknowledge();
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+ }
+ }
+ }
+
+#endif
+
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/DurableTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class DurableTest : NMSTest
+ {
+ protected static string DURABLE_SELECTOR = "2 > 1";
+
+ protected string TEST_CLIENT_AND_CONSUMER_ID;
+ protected string SEND_CLIENT_ID;
+
+ protected DurableTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+ //[SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ TEST_CLIENT_AND_CONSUMER_ID = GetTestClientId();
+ SEND_CLIENT_ID = GetTestClientId();
+ }
+
+ //[Test]
+ public virtual void TestSendWhileClosed(
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode, string testTopicRef)
+ {
+ try
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+ {
+ connection.Start();
+
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
+ IMessageProducer producer = session.CreateProducer(topic);
+
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+ ISession consumeSession = connection.CreateSession(ackMode);
+ IMessageConsumer consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+ Thread.Sleep(1000);
+ consumer.Dispose();
+ consumer = null;
+
+ ITextMessage message = session.CreateTextMessage("DurableTest-TestSendWhileClosed");
+ message.Properties.SetString("test", "test");
+ message.NMSType = "test";
+ producer.Send(message);
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ Thread.Sleep(1000);
+ consumer = consumeSession.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+ ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)) as ITextMessage;
+ msg.Acknowledge();
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ consumeSession.Commit();
+ }
+
+ Assert.IsNotNull(msg);
+ Assert.AreEqual(msg.Text, "DurableTest-TestSendWhileClosed");
+ Assert.AreEqual(msg.NMSType, "test");
+ Assert.AreEqual(msg.Properties.GetString("test"), "test");
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Assert.Fail(ex.Message);
+ }
+ finally
+ {
+ // Pause to allow Stomp to unregister at the broker.
+ Thread.Sleep(500);
+
+ UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+ }
+ }
+
+ //[Test]
+ public void TestDurableConsumerSelectorChange(
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode, string testTopicRef)
+ {
+ try
+ {
+ using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ ITopic topic = (ITopic)GetClearDestinationByNodeReference(session, testTopicRef);
+ IMessageProducer producer = session.CreateProducer(topic);
+ IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='red'", false);
+
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+ // Send the messages
+ ITextMessage sendMessage = session.CreateTextMessage("1st");
+ sendMessage.Properties["color"] = "red";
+ producer.Send(sendMessage);
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+ Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
+ Assert.AreEqual("1st", receiveMsg.Text);
+ Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+ receiveMsg.Acknowledge();
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ // Change the subscription, allowing some time for the Broker to purge the
+ // consumers resources.
+ consumer.Dispose();
+ Thread.Sleep(1000);
+
+ consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, "color='blue'", false);
+
+ sendMessage = session.CreateTextMessage("2nd");
+ sendMessage.Properties["color"] = "red";
+ producer.Send(sendMessage);
+ sendMessage = session.CreateTextMessage("3rd");
+ sendMessage.Properties["color"] = "blue";
+ producer.Send(sendMessage);
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ // Selector should skip the 2nd message.
+ receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
+ Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
+ Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
+ Assert.AreEqual(MsgDeliveryMode.Persistent, receiveMsg.NMSDeliveryMode, "NMSDeliveryMode does not match");
+ receiveMsg.Acknowledge();
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+
+ // Make sure there are no pending messages.
+ Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ Assert.Fail(ex.Message);
+ }
+ finally
+ {
+ // Pause to allow Stomp to unregister at the broker.
+ Thread.Sleep(500);
+
+ UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+ }
+ }
+
+ //[Test]
+ public void TestDurableConsumer(
+ //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+ // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+ AcknowledgementMode ackMode, string testDurableTopicName)
+ {
+ try
+ {
+ RegisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, testDurableTopicName, TEST_CLIENT_AND_CONSUMER_ID, null, false);
+ RunTestDurableConsumer(testDurableTopicName, ackMode);
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ RunTestDurableConsumer(testDurableTopicName, ackMode);
+ }
+ }
+ finally
+ {
+ // Pause to allow Stomp to unregister at the broker.
+ Thread.Sleep(500);
+
+ UnregisterDurableConsumer(TEST_CLIENT_AND_CONSUMER_ID, TEST_CLIENT_AND_CONSUMER_ID);
+ }
+ }
+
+ protected void RunTestDurableConsumer(string topicName, AcknowledgementMode ackMode)
+ {
+ SendDurableMessage(topicName);
+ SendDurableMessage(topicName);
+
+ using(IConnection connection = CreateConnection(TEST_CLIENT_AND_CONSUMER_ID))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(ackMode))
+ {
+ ITopic topic = SessionUtil.GetTopic(session, topicName);
+ using(IMessageConsumer consumer = session.CreateDurableConsumer(topic, TEST_CLIENT_AND_CONSUMER_ID, null, false))
+ {
+ IMessage msg = consumer.Receive(receiveTimeout);
+ Assert.IsNotNull(msg, "Did not receive first durable message.");
+ msg.Acknowledge();
+
+ msg = consumer.Receive(receiveTimeout);
+ Assert.IsNotNull(msg, "Did not receive second durable message.");
+ msg.Acknowledge();
+
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+ }
+ }
+ }
+
+ protected void SendDurableMessage(string topicName)
+ {
+ using(IConnection connection = CreateConnection(SEND_CLIENT_ID))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession())
+ {
+ ITopic topic = SessionUtil.GetTopic(session, topicName);
+ using(IMessageProducer producer = session.CreateProducer(topic))
+ {
+ ITextMessage message = session.CreateTextMessage("Durable Hello");
+
+ producer.DeliveryMode = MsgDeliveryMode.Persistent;
+ producer.Send(message);
+ }
+ }
+ }
+ }
+ }
+}
Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs?rev=1760211&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/test/csharp/EndianBinaryReaderTest.cs Sat Sep 10 23:09:20 2016
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ [TestFixture]
+ public class EndianBinaryReaderTest
+ {
+ public void readString16Helper(byte[] input, char[] expect)
+ {
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ char[] result = reader.ReadString16().ToCharArray();
+
+ for(int i = 0; i < expect.Length; ++i)
+ {
+ Assert.AreEqual(expect[i], result[i]);
+ }
+ }
+
+ [Test]
+ public void testReadString16_1byteUTF8encoding()
+ {
+ // Test data with 1-byte UTF8 encoding.
+ char[] expect = { '\u0000', '\u000B', '\u0048', '\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', '\u006C', '\u0064' };
+ byte[] input = { 0x00, 0x0E, 0xC0, 0x80, 0x0B, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+ readString16Helper(input, expect);
+ }
+
+ [Test]
+ public void testReadString16_2byteUTF8encoding()
+ {
+ // Test data with 2-byte UT8 encoding.
+ char[] expect = { '\u0000', '\u00C2', '\u00A9', '\u00C3', '\u00A6' };
+ byte[] input = { 0x00, 0x0A, 0xC0, 0x80, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+ readString16Helper(input, expect);
+ }
+
+ [Test]
+ public void testReadString16_1byteAnd2byteEmbeddedNULLs()
+ {
+ // Test data with 1-byte and 2-byte encoding with embedded NULL's.
+ char[] expect = { '\u0000', '\u0004', '\u00C2', '\u00A9', '\u00C3', '\u0000', '\u00A6' };
+ byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+ readString16Helper(input, expect);
+ }
+
+ [Test]
+ [ExpectedException(typeof(IOException))]
+ public void testReadString16_UTF8Missing2ndByte()
+ {
+ // Test with bad UTF-8 encoding, missing 2nd byte of two byte value
+ byte[] input = { 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ reader.ReadString16();
+ }
+
+ [Test]
+ [ExpectedException(typeof(IOException))]
+ public void testReadString16_3byteEncodingMissingLastByte()
+ {
+ // Test with three byte encode that's missing a last byte.
+ byte[] input = { 0x00, 0x02, 0xE8, 0xA8 };
+
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ reader.ReadString16();
+ }
+
+ public void readString32Helper(byte[] input, char[] expect)
+ {
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ char[] result = reader.ReadString32().ToCharArray();
+
+ for(int i = 0; i < expect.Length; ++i)
+ {
+ Assert.AreEqual(expect[i], result[i]);
+ }
+ }
+
+ [Test]
+ public void testReadString32_1byteUTF8encoding()
+ {
+ // Test data with 1-byte UTF8 encoding.
+ char[] expect = { '\u0000', '\u000B', '\u0048', '\u0065', '\u006C', '\u006C', '\u006F', '\u0020', '\u0057', '\u006F', '\u0072', '\u006C', '\u0064' };
+ byte[] input = { 0x00, 0x00, 0x00, 0x0E, 0xC0, 0x80, 0x0B, 0x48, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x57, 0x6F, 0x72, 0x6C, 0x64 };
+
+ readString32Helper(input, expect);
+ }
+
+ [Test]
+ public void testReadString32_2byteUTF8encoding()
+ {
+ // Test data with 2-byte UT8 encoding.
+ char[] expect = { '\u0000', '\u00C2', '\u00A9', '\u00C3', '\u00A6' };
+ byte[] input = { 0x00, 0x00, 0x00, 0x0A, 0xC0, 0x80, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC2, 0xA6 };
+ readString32Helper(input, expect);
+ }
+
+ [Test]
+ public void testReadString32_1byteAnd2byteEmbeddedNULLs()
+ {
+ // Test data with 1-byte and 2-byte encoding with embedded NULL's.
+ char[] expect = { '\u0000', '\u0004', '\u00C2', '\u00A9', '\u00C3', '\u0000', '\u00A6' };
+ byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xA9, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+ readString32Helper(input, expect);
+ }
+
+ [Test]
+ [ExpectedException(typeof(IOException))]
+ public void testReadString32_UTF8Missing2ndByte()
+ {
+ // Test with bad UTF-8 encoding, missing 2nd byte of two byte value
+ byte[] input = { 0x00, 0x00, 0x00, 0x0D, 0xC0, 0x80, 0x04, 0xC3, 0x82, 0xC2, 0xC2, 0xC3, 0x83, 0xC0, 0x80, 0xC2, 0xA6 };
+
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ reader.ReadString32();
+ }
+
+ [Test]
+ [ExpectedException(typeof(IOException))]
+ public void testReadString32_3byteEncodingMissingLastByte()
+ {
+ // Test with three byte encode that's missing a last byte.
+ byte[] input = { 0x00, 0x00, 0x00, 0x02, 0xE8, 0xA8 };
+
+ MemoryStream stream = new MemoryStream(input);
+ EndianBinaryReader reader = new EndianBinaryReader(stream);
+
+ reader.ReadString32();
+ }
+ }
+}