You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/08 13:45:12 UTC
[06/13] activemq-nms-xms git commit: Initial check-in of new Apache.NMS.XMS provider implementation. Big thanks to Stéphane Ramet for the implementation! Fixes [AMQNET-185]. (See https://issues.apache.org/jira/browse/AMQNET-185)
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/AsyncConsumeTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/AsyncConsumeTest.cs b/src/test/csharp/AsyncConsumeTest.cs
new file mode 100644
index 0000000..c414927
--- /dev/null
+++ b/src/test/csharp/AsyncConsumeTest.cs
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class AsyncConsumeTest : NMSTest
+ {
+ protected string RESPONSE_CLIENT_ID;
+ protected AutoResetEvent semaphore;
+ protected bool received;
+ protected IMessage receivedMsg;
+
+ public AsyncConsumeTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+ //[SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+ semaphore = new AutoResetEvent(false);
+ received = false;
+ receivedMsg = null;
+
+ RESPONSE_CLIENT_ID = GetTestClientId() + ":RESPONSE";
+ }
+
+ //[TearDown]
+ public override void TearDown()
+ {
+ receivedMsg = null;
+ base.TearDown();
+ }
+
+ //[Test]
+ public virtual void TestAsynchronousConsume(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ // IBM XMS doesn't support both synchronous and asynchronous operations
+ // in the same session. Needs 2 separate sessions.
+ using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
+ using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(IDestination producerDestination = GetClearDestination(producerSession, DestinationType.Queue, testQueueRef))
+ using(IDestination consumerDestination = GetDestination(consumerSession, DestinationType.Queue, testQueueRef))
+ using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination))
+ using(IMessageProducer producer = producerSession.CreateProducer(producerDestination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ consumer.Listener += new MessageListener(OnMessage);
+
+ IMessage request = producerSession.CreateMessage();
+ request.NMSCorrelationID = "AsyncConsume";
+ request.NMSType = "Test";
+ producer.Send(request);
+
+ WaitForMessageToArrive();
+ Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
+ }
+ }
+
+ //[Test]
+ public virtual void TestCreateConsumerAfterSend(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
+ using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(IDestination producerDestination = GetClearDestination(producerSession, DestinationType.Queue, testQueueRef))
+ using(IDestination consumerDestination = GetDestination(consumerSession, DestinationType.Queue, testQueueRef))
+ {
+ string correlationId = "AsyncConsumeAfterSend";
+
+ using(IMessageProducer producer = producerSession.CreateProducer(producerDestination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ IMessage request = producerSession.CreateMessage();
+ request.NMSCorrelationID = correlationId;
+ request.NMSType = "Test";
+ producer.Send(request);
+ }
+
+ using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination))
+ {
+ consumer.Listener += new MessageListener(OnMessage);
+ WaitForMessageToArrive();
+ Assert.AreEqual(correlationId, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void TestCreateConsumerBeforeSendAddListenerAfterSend(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
+ using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(IDestination producerDestination = GetClearDestination(producerSession, DestinationType.Queue, testQueueRef))
+ using(IDestination consumerDestination = GetDestination(consumerSession, DestinationType.Queue, testQueueRef))
+ using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination))
+ using(IMessageProducer producer = producerSession.CreateProducer(producerDestination))
+ {
+ producer.DeliveryMode = deliveryMode;
+
+ IMessage request = producerSession.CreateMessage();
+ request.NMSCorrelationID = "AsyncConsumeAfterSendLateListener";
+ request.NMSType = "Test";
+ producer.Send(request);
+
+ // now lets add the listener
+ consumer.Listener += new MessageListener(OnMessage);
+ WaitForMessageToArrive();
+ Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
+ }
+ }
+
+ //[Test]
+ public virtual void TestAsynchronousTextMessageConsume(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
+ using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(IDestination producerDestination = GetClearDestination(producerSession, DestinationType.Queue, testQueueRef))
+ using(IDestination consumerDestination = GetDestination(consumerSession, DestinationType.Queue, testQueueRef))
+ using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination))
+ using(IMessageProducer producer = producerSession.CreateProducer(producerDestination))
+ {
+ consumer.Listener += new MessageListener(OnMessage);
+ producer.DeliveryMode = deliveryMode;
+
+ ITextMessage request = producerSession.CreateTextMessage("Hello, World!");
+ request.NMSCorrelationID = "AsyncConsumeTextMessage";
+ request.Properties["NMSXGroupID"] = "cheese";
+ request.Properties["myHeader"] = "James";
+
+ producer.Send(request);
+
+ WaitForMessageToArrive();
+ Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
+ Assert.AreEqual(request.Properties["NMSXGroupID"], receivedMsg.Properties["NMSXGroupID"], "Invalid NMSXGroupID.");
+ Assert.AreEqual(request.Properties["myHeader"], receivedMsg.Properties["myHeader"], "Invalid myHeader.");
+ Assert.AreEqual(request.Text, ((ITextMessage) receivedMsg).Text, "Invalid text body.");
+ }
+ }
+
+ //[Test]
+ public virtual void TestTemporaryQueueAsynchronousConsume(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnectionAndStart(GetTestClientId()))
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ITemporaryQueue tempReplyDestination = session.CreateTemporaryQueue())
+ using(IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef))
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageConsumer tempConsumer = session.CreateConsumer(tempReplyDestination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ tempConsumer.Listener += new MessageListener(OnMessage);
+ consumer.Listener += new MessageListener(OnQueueMessage);
+
+ IMessage request = session.CreateMessage();
+ request.NMSCorrelationID = "TemqQueueAsyncConsume";
+ request.NMSType = "Test";
+ request.NMSReplyTo = tempReplyDestination;
+ producer.Send(request);
+
+ WaitForMessageToArrive();
+ Assert.AreEqual("TempQueueAsyncResponse", receivedMsg.NMSCorrelationID, "Invalid correlation ID.");
+ }
+ }
+
+ protected void OnQueueMessage(IMessage message)
+ {
+ Assert.AreEqual("TemqQueueAsyncConsume", message.NMSCorrelationID, "Invalid correlation ID.");
+ using(IConnection connection = CreateConnectionAndStart(RESPONSE_CLIENT_ID))
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(IMessageProducer producer = session.CreateProducer(message.NMSReplyTo))
+ {
+ producer.DeliveryMode = message.NMSDeliveryMode;
+
+ ITextMessage response = session.CreateTextMessage("Asynchronous Response Message Text");
+ response.NMSCorrelationID = "TempQueueAsyncResponse";
+ response.NMSType = message.NMSType;
+ producer.Send(response);
+ }
+ }
+
+ protected void OnMessage(IMessage message)
+ {
+ receivedMsg = message;
+ received = true;
+ semaphore.Set();
+ }
+
+ protected void WaitForMessageToArrive()
+ {
+ semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true);
+ Assert.IsTrue(received, "Should have received a message by now!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/BadConsumeTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/BadConsumeTest.cs b/src/test/csharp/BadConsumeTest.cs
new file mode 100644
index 0000000..564f9d8
--- /dev/null
+++ b/src/test/csharp/BadConsumeTest.cs
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class BadConsumeTest : NMSTest
+ {
+ protected IConnection connection;
+ protected ISession session;
+
+ protected BadConsumeTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+ //[SetUp]
+ public override void SetUp()
+ {
+ connection = CreateConnection(GetTestClientId());
+ connection.Start();
+ session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ }
+
+ //[TearDown]
+ public override void TearDown()
+ {
+ if(null != session)
+ {
+ session.Dispose();
+ session = null;
+ }
+
+ if(null != connection)
+ {
+ connection.Dispose();
+ connection = null;
+ }
+ }
+
+ //[Test]
+ //[ExpectedException(Handler="ExceptionValidationCheck")]
+ public virtual void TestBadConsumerException()
+ {
+ session.CreateConsumer(null);
+ }
+
+ public void ExceptionValidationCheck(Exception ex)
+ {
+ Assert.IsNotNull(ex as NMSException, "Invalid exception was thrown.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/BytesMessageTest.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/BytesMessageTest.cs b/src/test/csharp/BytesMessageTest.cs
new file mode 100644
index 0000000..d8c3af7
--- /dev/null
+++ b/src/test/csharp/BytesMessageTest.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.Test
+{
+ //[TestFixture]
+ public class BytesMessageTest : NMSTest
+ {
+ protected byte[] msgContent = {1, 2, 3, 4, 5, 6, 7, 8};
+
+ protected BytesMessageTest(NMSTestSupport testSupport)
+ : base(testSupport)
+ {
+ }
+
+ //[Test]
+ public virtual void SendReceiveBytesMessage(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnection(GetTestClientId()))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef);
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ IMessage request = session.CreateBytesMessage(msgContent);
+ producer.Send(request);
+
+ IMessage message = consumer.Receive(receiveTimeout);
+ AssertMessageIsReadOnly(message);
+ AssertBytesMessageEqual(request, message);
+ Assert.AreEqual(deliveryMode, message.NMSDeliveryMode, "NMSDeliveryMode does not match");
+
+ }
+ }
+ }
+ }
+
+ //[Test]
+ public virtual void SendReceiveBytesMessageContent(
+ //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+ MsgDeliveryMode deliveryMode, string testQueueRef)
+ {
+ using(IConnection connection = CreateConnection(GetTestClientId()))
+ {
+ connection.Start();
+ using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef);
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
+ {
+ producer.DeliveryMode = deliveryMode;
+ IBytesMessage request = session.CreateBytesMessage();
+
+ request.WriteBoolean(true);
+ request.WriteByte((byte) 1);
+ request.WriteBytes(new byte[1]);
+ request.WriteBytes(new byte[3], 0, 2);
+ request.WriteChar('a');
+ request.WriteDouble(1.5);
+ request.WriteSingle((float) 1.5);
+ request.WriteInt32(1);
+ request.WriteInt64(1);
+ request.WriteObject("stringobj");
+ request.WriteInt16((short) 1);
+ request.WriteString("utfstring");
+
+ producer.Send(request);
+
+ IMessage message = consumer.Receive(receiveTimeout);
+ AssertMessageIsReadOnly(message);
+ AssertBytesMessageEqual(request, message);
+ Assert.AreEqual(deliveryMode, message.NMSDeliveryMode, "NMSDeliveryMode does not match");
+
+ }
+ }
+ }
+ }
+
+ protected void AssertMessageIsReadOnly(IMessage message)
+ {
+ Type writeableExceptionType = typeof(MessageNotWriteableException);
+ IBytesMessage theMessage = message as IBytesMessage;
+ Assert.IsNotNull(theMessage);
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBoolean(true); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteByte((byte) 1); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBytes(new byte[1]); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBytes(new byte[3], 0, 2); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteChar('a'); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteDouble(1.5); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteSingle((float) 1.5); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt32(1); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt64(1); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteObject("stringobj"); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt16((short) 1); });
+ Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteString("utfstring"); });
+ }
+
+ /// <summary>
+ /// Assert that two messages are IBytesMessages and their contents are equal.
+ /// </summary>
+ /// <param name="expected"></param>
+ /// <param name="actual"></param>
+ protected void AssertBytesMessageEqual(IMessage expected, IMessage actual)
+ {
+ IBytesMessage expectedBytesMsg = expected as IBytesMessage;
+ expectedBytesMsg.Reset();
+ Assert.IsNotNull(expectedBytesMsg, "'expected' message not a bytes message");
+ IBytesMessage actualBytesMsg = actual as IBytesMessage;
+ Assert.IsNotNull(actualBytesMsg, "'actual' message not a bytes message");
+ Assert.AreEqual(expectedBytesMsg.Content, actualBytesMsg.Content, "Bytes message contents do not match.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/BytesMessage.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/BytesMessage.cs b/src/test/csharp/Commands/BytesMessage.cs
new file mode 100644
index 0000000..e581942
--- /dev/null
+++ b/src/test/csharp/Commands/BytesMessage.cs
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS;
+using Apache.NMS.Util;
+using System;
+using System.Collections;
+using System.IO;
+
+namespace Apache.NMS.Commands
+{
+ public class BytesMessage : Message, IBytesMessage
+ {
+ private EndianBinaryReader dataIn = null;
+ private EndianBinaryWriter dataOut = null;
+ private MemoryStream outputBuffer = null;
+ private int length = 0;
+
+ public override Object Clone()
+ {
+ StoreContent();
+ return base.Clone();
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ this.outputBuffer = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ this.length = 0;
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ InitializeReading();
+ return this.length;
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadByte();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteByte( byte value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public bool ReadBoolean()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadBoolean();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean( bool value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadChar();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteChar( char value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt16( short value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt32();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt32( int value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadInt64();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteInt64( long value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadSingle();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteSingle( float value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.ReadDouble();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteDouble( double value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, value.Length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes( byte[] value, int length )
+ {
+ InitializeReading();
+ try
+ {
+ return dataIn.Read( value, 0, length );
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, 0, value.Length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes( byte[] value, int offset, int length )
+ {
+ InitializeWriting();
+ try
+ {
+ dataOut.Write( value, offset, length );
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReading();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ return dataIn.ReadString16();
+ }
+ catch(EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch(IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteString( string value )
+ {
+ InitializeWriting();
+ try
+ {
+ // JMS, CMS and NMS all encode the String using a 16 bit size header.
+ dataOut.WriteString16(value);
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject( System.Object value )
+ {
+ InitializeWriting();
+ if( value is System.Byte )
+ {
+ this.dataOut.Write( (byte) value );
+ }
+ else if( value is Char )
+ {
+ this.dataOut.Write( (char) value );
+ }
+ else if( value is Boolean )
+ {
+ this.dataOut.Write( (bool) value );
+ }
+ else if( value is Int16 )
+ {
+ this.dataOut.Write( (short) value );
+ }
+ else if( value is Int32 )
+ {
+ this.dataOut.Write( (int) value );
+ }
+ else if( value is Int64 )
+ {
+ this.dataOut.Write( (long) value );
+ }
+ else if( value is Single )
+ {
+ this.dataOut.Write( (float) value );
+ }
+ else if( value is Double )
+ {
+ this.dataOut.Write( (double) value );
+ }
+ else if( value is byte[] )
+ {
+ this.dataOut.Write( (byte[]) value );
+ }
+ else if( value is String )
+ {
+ this.dataOut.WriteString16( (string) value );
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
+ }
+ }
+
+ public new byte[] Content
+ {
+ get
+ {
+ byte[] buffer = null;
+ InitializeReading();
+ if(this.length != 0)
+ {
+ buffer = new byte[this.length];
+ this.dataIn.Read(buffer, 0, buffer.Length);
+ }
+ return buffer;
+ }
+
+ set
+ {
+ InitializeWriting();
+ this.dataOut.Write(value, 0, value.Length);
+ }
+ }
+
+ public void Reset()
+ {
+ StoreContent();
+ this.dataIn = null;
+ this.dataOut = null;
+ this.outputBuffer = null;
+ this.ReadOnlyBody = true;
+ }
+
+ private void InitializeReading()
+ {
+ FailIfWriteOnlyBody();
+ if(this.dataIn == null)
+ {
+ byte[] data = base.Content;
+
+ if(base.Content == null)
+ {
+ data = new byte[0];
+ }
+
+ Stream target = new MemoryStream(data, false);
+
+ this.length = data.Length;
+ this.dataIn = new EndianBinaryReader(target);
+ }
+ }
+
+ private void InitializeWriting()
+ {
+ FailIfReadOnlyBody();
+ if(this.dataOut == null)
+ {
+ this.outputBuffer = new MemoryStream();
+ this.dataOut = new EndianBinaryWriter(this.outputBuffer);
+ }
+ }
+
+ private void StoreContent()
+ {
+ if(this.dataOut != null)
+ {
+ this.dataOut.Close();
+ base.Content = outputBuffer.ToArray();
+
+ this.dataOut = null;
+ this.outputBuffer = null;
+ }
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/Destination.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/Destination.cs b/src/test/csharp/Commands/Destination.cs
new file mode 100644
index 0000000..23c477d
--- /dev/null
+++ b/src/test/csharp/Commands/Destination.cs
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+ /// <summary>
+ /// Summary description for Destination.
+ /// </summary>
+ public abstract class Destination : IDestination, ICloneable
+ {
+ /// <summary>
+ /// Topic Destination object
+ /// </summary>
+ public const int TOPIC = 1;
+ /// <summary>
+ /// Temporary Topic Destination object
+ /// </summary>
+ public const int TEMPORARY_TOPIC = 2;
+ /// <summary>
+ /// Queue Destination object
+ /// </summary>
+ public const int QUEUE = 3;
+ /// <summary>
+ /// Temporary Queue Destination object
+ /// </summary>
+ public const int TEMPORARY_QUEUE = 4;
+
+ private const String TEMP_PREFIX = "{TD{";
+ private const String TEMP_POSTFIX = "}TD}";
+
+ private String physicalName = "";
+ private StringDictionary options = null;
+
+ private bool disposed = false;
+
+ /// <summary>
+ /// The Default Constructor
+ /// </summary>
+ protected Destination()
+ {
+ }
+
+ /// <summary>
+ /// Construct the Destination with a defined physical name;
+ /// </summary>
+ /// <param name="name"></param>
+ protected Destination(String name)
+ {
+ setPhysicalName(name);
+ }
+
+ ~Destination()
+ {
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ try
+ {
+ OnDispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception disposing Destination {0}: {1}", this.physicalName, ex.Message);
+ }
+ }
+
+ disposed = true;
+ }
+
+ /// <summary>
+ /// Child classes can override this method to perform clean-up logic.
+ /// </summary>
+ protected virtual void OnDispose()
+ {
+ }
+
+ public bool IsTopic
+ {
+ get
+ {
+ int destinationType = GetDestinationType();
+ return TOPIC == destinationType
+ || TEMPORARY_TOPIC == destinationType;
+ }
+ }
+
+ public bool IsQueue
+ {
+ get
+ {
+ int destinationType = GetDestinationType();
+ return QUEUE == destinationType
+ || TEMPORARY_QUEUE == destinationType;
+ }
+ }
+
+ public bool IsTemporary
+ {
+ get
+ {
+ int destinationType = GetDestinationType();
+ return TEMPORARY_QUEUE == destinationType
+ || TEMPORARY_TOPIC == destinationType;
+ }
+ }
+
+ /// <summary>
+ /// Dictionary of name/value pairs representing option values specified
+ /// in the URI used to create this Destination. A null value is returned
+ /// if no options were specified.
+ /// </summary>
+ internal StringDictionary Options
+ {
+ get { return this.options; }
+ }
+
+ private void setPhysicalName(string name)
+ {
+ this.physicalName = name;
+
+ int p = name.IndexOf('?');
+ if(p >= 0)
+ {
+ String optstring = physicalName.Substring(p + 1);
+ this.physicalName = name.Substring(0, p);
+ options = URISupport.ParseQuery(optstring);
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="destination"></param>
+ /// <returns></returns>
+ public static Destination Transform(IDestination destination)
+ {
+ Destination result = null;
+ if(destination != null)
+ {
+ if(destination is Destination)
+ {
+ result = (Destination) destination;
+ }
+ else
+ {
+ if(destination is ITemporaryQueue)
+ {
+ result = new TempQueue(((IQueue) destination).QueueName);
+ }
+ else if(destination is ITemporaryTopic)
+ {
+ result = new TempTopic(((ITopic) destination).TopicName);
+ }
+ else if(destination is IQueue)
+ {
+ result = new Queue(((IQueue) destination).QueueName);
+ }
+ else if(destination is ITopic)
+ {
+ result = new Topic(((ITopic) destination).TopicName);
+ }
+ }
+ }
+ return result;
+ }
+
+ /// <summary>
+ /// Create a temporary name from the clientId
+ /// </summary>
+ /// <param name="clientId"></param>
+ /// <returns></returns>
+ public static String CreateTemporaryName(String clientId)
+ {
+ return TEMP_PREFIX + clientId + TEMP_POSTFIX;
+ }
+
+ /// <summary>
+ /// From a temporary destination find the clientId of the Connection that created it
+ /// </summary>
+ /// <param name="destination"></param>
+ /// <returns>the clientId or null if not a temporary destination</returns>
+ public static String GetClientId(Destination destination)
+ {
+ String answer = null;
+ if(destination != null && destination.IsTemporary)
+ {
+ String name = destination.PhysicalName;
+ int start = name.IndexOf(TEMP_PREFIX);
+ if(start >= 0)
+ {
+ start += TEMP_PREFIX.Length;
+ int stop = name.LastIndexOf(TEMP_POSTFIX);
+ if(stop > start && stop < name.Length)
+ {
+ answer = name.Substring(start, stop);
+ }
+ }
+ }
+ return answer;
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <param name="o">object to compare</param>
+ /// <returns>1 if this is less than o else 0 if they are equal or -1 if this is less than o</returns>
+ public int CompareTo(Object o)
+ {
+ if(o is Destination)
+ {
+ return CompareTo((Destination) o);
+ }
+ return -1;
+ }
+
+ /// <summary>
+ /// Lets sort by name first then lets sort topics greater than queues
+ /// </summary>
+ /// <param name="that">another destination to compare against</param>
+ /// <returns>1 if this is less than o else 0 if they are equal or -1 if this is less than o</returns>
+ public int CompareTo(Destination that)
+ {
+ int answer = 0;
+ if(physicalName != that.physicalName)
+ {
+ if(physicalName == null)
+ {
+ return -1;
+ }
+ else if(that.physicalName == null)
+ {
+ return 1;
+ }
+ answer = physicalName.CompareTo(that.physicalName);
+ }
+
+ if(answer == 0)
+ {
+ if(IsTopic)
+ {
+ if(that.IsQueue)
+ {
+ return 1;
+ }
+ }
+ else
+ {
+ if(that.IsTopic)
+ {
+ return -1;
+ }
+ }
+ }
+ return answer;
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <returns>Returns the Destination type</returns>
+ public abstract int GetDestinationType();
+
+ public String PhysicalName
+ {
+ get { return this.physicalName; }
+ set
+ {
+ this.physicalName = value;
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <returns>string representation of this instance</returns>
+ public override String ToString()
+ {
+ switch(DestinationType)
+ {
+ case DestinationType.Topic:
+ return "topic://" + PhysicalName;
+
+ case DestinationType.TemporaryTopic:
+ return "temp-topic://" + PhysicalName;
+
+ case DestinationType.TemporaryQueue:
+ return "temp-queue://" + PhysicalName;
+
+ default:
+ return "queue://" + PhysicalName;
+ }
+ }
+
+ /// <summary>
+ /// </summary>
+ /// <returns>hashCode for this instance</returns>
+ public override int GetHashCode()
+ {
+ int answer = 37;
+
+ if(this.physicalName != null)
+ {
+ answer = physicalName.GetHashCode();
+ }
+ if(IsTopic)
+ {
+ answer ^= 0xfabfab;
+ }
+ return answer;
+ }
+
+ /// <summary>
+ /// if the object passed in is equivalent, return true
+ /// </summary>
+ /// <param name="obj">the object to compare</param>
+ /// <returns>true if this instance and obj are equivalent</returns>
+ public override bool Equals(Object obj)
+ {
+ bool result = this == obj;
+ if(!result && obj != null && obj is Destination)
+ {
+ Destination other = (Destination) obj;
+ result = this.GetDestinationType() == other.GetDestinationType()
+ && this.physicalName.Equals(other.physicalName);
+ }
+ return result;
+ }
+
+ /// <summary>
+ /// Factory method to create a child destination if this destination is a composite
+ /// </summary>
+ /// <param name="name"></param>
+ /// <returns>the created Destination</returns>
+ public abstract Destination CreateDestination(String name);
+
+ public abstract DestinationType DestinationType
+ {
+ get;
+ }
+
+ public virtual Object Clone()
+ {
+ // Since we are the lowest level base class, do a
+ // shallow copy which will include the derived classes.
+ // From here we would do deep cloning of other objects
+ // if we had any.
+ return this.MemberwiseClone();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/MapMessage.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/MapMessage.cs b/src/test/csharp/Commands/MapMessage.cs
new file mode 100644
index 0000000..4ba9751
--- /dev/null
+++ b/src/test/csharp/Commands/MapMessage.cs
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+ public class MapMessage : Message, IMapMessage
+ {
+ private PrimitiveMap body;
+ private PrimitiveMapInterceptor typeConverter;
+
+ public MapMessage() : base()
+ {
+ }
+
+ public MapMessage(PrimitiveMap body) : base()
+ {
+ this.body = body;
+ this.typeConverter = new PrimitiveMapInterceptor(this, this.body);
+ }
+
+ public override void ClearBody()
+ {
+ this.body = null;
+ this.typeConverter = null;
+ base.ClearBody();
+ }
+
+ public override bool ReadOnlyBody
+ {
+ get { return base.ReadOnlyBody; }
+
+ set
+ {
+ if(this.typeConverter != null)
+ {
+ this.typeConverter.ReadOnly = true;
+ }
+
+ base.ReadOnlyBody = value;
+ }
+ }
+
+
+ public IPrimitiveMap Body
+ {
+ get
+ {
+ if(this.body == null)
+ {
+ this.body = new PrimitiveMap();
+ this.typeConverter = new PrimitiveMapInterceptor(this, this.body);
+ }
+
+ return this.typeConverter;
+ }
+
+ set
+ {
+ this.body = value as PrimitiveMap;
+ if(value != null)
+ {
+ this.typeConverter = new PrimitiveMapInterceptor(this, value);
+ }
+ else
+ {
+ this.typeConverter = null;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/Message.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/Message.cs b/src/test/csharp/Commands/Message.cs
new file mode 100644
index 0000000..5cadd5f
--- /dev/null
+++ b/src/test/csharp/Commands/Message.cs
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Commands
+{
+ public class Message : IMessage, ICloneable
+ {
+ private IDestination destination;
+ private string transactionId;
+ private string messageId;
+ private string groupID;
+ private int groupSequence;
+ private string correlationId;
+ private bool persistent;
+ private long expiration;
+ private byte priority;
+ private IDestination replyTo;
+ private long timestamp;
+ private string type;
+ private bool redelivered;
+ private byte[] content;
+ private bool readOnlyMsgProperties;
+ private bool readOnlyMsgBody;
+
+ private MessagePropertyIntercepter propertyHelper;
+ private PrimitiveMap properties;
+
+ ///
+ /// <summery>
+ /// Clone this object and return a new instance that the caller now owns.
+ /// </summery>
+ ///
+ public virtual Object Clone()
+ {
+ // Since we are the lowest level base class, do a
+ // shallow copy which will include the derived classes.
+ // From here we would do deep cloning of other objects
+ // if we had any.
+ Message o = (Message) this.MemberwiseClone();
+
+ if(this.messageId != null)
+ {
+ o.NMSMessageId = (string) this.messageId.Clone();
+ }
+
+ return o;
+ }
+
+ ///
+ /// <summery>
+ /// Returns a string containing the information for this DataStructure
+ /// such as its type and value of its elements.
+ /// </summery>
+ ///
+ public override string ToString()
+ {
+ return GetType().Name + "[" +
+ "Destination=" + destination + ", " +
+ "TransactionId=" + transactionId + ", " +
+ "MessageId=" + messageId + ", " +
+ "GroupID=" + groupID + ", " +
+ "GroupSequence=" + groupSequence + ", " +
+ "CorrelationId=" + correlationId + ", " +
+ "Expiration=" + expiration + ", " +
+ "Priority=" + priority + ", " +
+ "ReplyTo=" + replyTo + ", " +
+ "Timestamp=" + timestamp + ", " +
+ "Type=" + type + ", " +
+ "Redelivered=" + redelivered +
+ "]";
+ }
+
+ public void Acknowledge()
+ {
+ }
+
+ public virtual void ClearBody()
+ {
+ this.content = null;
+ }
+
+ public virtual void ClearProperties()
+ {
+ this.properties.Clear();
+ }
+
+ protected void FailIfReadOnlyBody()
+ {
+ if(ReadOnlyBody == true)
+ {
+ throw new MessageNotWriteableException("Message is in Read-Only mode.");
+ }
+ }
+
+ protected void FailIfWriteOnlyBody()
+ {
+ if(ReadOnlyBody == false)
+ {
+ throw new MessageNotReadableException("Message is in Write-Only mode.");
+ }
+ }
+
+ #region Properties
+
+ public string TransactionId
+ {
+ get { return this.transactionId; }
+ set { this.transactionId = value; }
+ }
+
+ public byte[] Content
+ {
+ get { return content; }
+ set { this.content = value; }
+ }
+
+ public virtual bool ReadOnlyProperties
+ {
+ get { return this.readOnlyMsgProperties; }
+ set { this.readOnlyMsgProperties = value; }
+ }
+
+ public virtual bool ReadOnlyBody
+ {
+ get { return this.readOnlyMsgBody; }
+ set { this.readOnlyMsgBody = value; }
+ }
+
+ public IPrimitiveMap Properties
+ {
+ get
+ {
+ if(null == properties)
+ {
+ properties = new PrimitiveMap();
+ propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties);
+ propertyHelper.AllowByteArrays = false;
+ }
+
+ return propertyHelper;
+ }
+ }
+
+ /// <summary>
+ /// The correlation ID used to correlate messages with conversations or long running business processes
+ /// </summary>
+ public string NMSCorrelationID
+ {
+ get { return correlationId; }
+ set { correlationId = value; }
+ }
+
+ /// <summary>
+ /// The destination of the message
+ /// </summary>
+ public IDestination NMSDestination
+ {
+ get { return destination; }
+ set { this.destination = Destination.Transform(value); }
+ }
+
+ private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0);
+ /// <summary>
+ /// The time in milliseconds that this message should expire in
+ /// </summary>
+ public TimeSpan NMSTimeToLive
+ {
+ get { return timeToLive; }
+
+ set
+ {
+ timeToLive = value;
+ if(timeToLive.TotalMilliseconds > 0)
+ {
+ long timeStamp = timestamp;
+
+ if(timeStamp == 0)
+ {
+ timeStamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
+ }
+
+ expiration = timeStamp + (long) timeToLive.TotalMilliseconds;
+ }
+ else
+ {
+ expiration = 0;
+ }
+ }
+ }
+
+ /// <summary>
+ /// The timestamp the broker added to the message
+ /// </summary>
+ public DateTime NMSTimestamp
+ {
+ get { return DateUtils.ToDateTime(timestamp); }
+ set
+ {
+ timestamp = DateUtils.ToJavaTimeUtc(value);
+ if(timeToLive.TotalMilliseconds > 0)
+ {
+ expiration = timestamp + (long) timeToLive.TotalMilliseconds;
+ }
+ }
+ }
+
+ /// <summary>
+ /// The message ID which is set by the provider
+ /// </summary>
+ public string NMSMessageId
+ {
+ get { return this.messageId; }
+ set { this.messageId = value; }
+ }
+
+ /// <summary>
+ /// Whether or not this message is persistent
+ /// </summary>
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get { return (persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); }
+ set { persistent = (MsgDeliveryMode.Persistent == value); }
+ }
+
+ /// <summary>
+ /// The Priority on this message
+ /// </summary>
+ public MsgPriority NMSPriority
+ {
+ get { return (MsgPriority) priority; }
+ set { priority = (byte) value; }
+ }
+
+ /// <summary>
+ /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully.
+ /// </summary>
+ public bool NMSRedelivered
+ {
+ get { return this.redelivered; }
+ set { this.redelivered = value; }
+ }
+
+ /// <summary>
+ /// The destination that the consumer of this message should send replies to
+ /// </summary>
+ public IDestination NMSReplyTo
+ {
+ get { return replyTo; }
+ set { replyTo = Destination.Transform(value); }
+ }
+
+ /// <summary>
+ /// The type name of this message
+ /// </summary>
+ public string NMSType
+ {
+ get { return type; }
+ set { type = value; }
+ }
+
+ #endregion
+
+ #region NMS Extension headers
+
+ /// <summary>
+ /// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully.
+ /// </summary>
+ public int NMSXDeliveryCount
+ {
+ get { return 0; }
+ }
+
+ /// <summary>
+ /// The Message Group ID used to group messages together to the same consumer for the same group ID value
+ /// </summary>
+ public string NMSXGroupID
+ {
+ get { return groupID; }
+ set { groupID = value; }
+ }
+ /// <summary>
+ /// The Message Group Sequence counter to indicate the position in a group
+ /// </summary>
+ public int NMSXGroupSeq
+ {
+ get { return groupSequence; }
+ set { groupSequence = value; }
+ }
+
+ /// <summary>
+ /// Returns the ID of the producers transaction
+ /// </summary>
+ public string NMSXProducerTXID
+ {
+ get
+ {
+ if(null != transactionId)
+ {
+ return transactionId;
+ }
+
+ return String.Empty;
+ }
+ }
+
+ #endregion
+
+ };
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/ObjectMessage.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/ObjectMessage.cs b/src/test/csharp/Commands/ObjectMessage.cs
new file mode 100644
index 0000000..ba68ee0
--- /dev/null
+++ b/src/test/csharp/Commands/ObjectMessage.cs
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+using System;
+using System.Collections;
+using System.IO;
+
+using Apache.NMS;
+
+namespace Apache.NMS.Commands
+{
+ public class ObjectMessage : Message, IObjectMessage
+ {
+ private object body;
+
+ public override string ToString() {
+ return GetType().Name + "["
+ + " ]";
+ }
+
+ // Properties
+
+ public object Body
+ {
+ get { return body; }
+ set { body = value; }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-xms/blob/653d676d/src/test/csharp/Commands/Queue.cs
----------------------------------------------------------------------
diff --git a/src/test/csharp/Commands/Queue.cs b/src/test/csharp/Commands/Queue.cs
new file mode 100644
index 0000000..c0dd5f9
--- /dev/null
+++ b/src/test/csharp/Commands/Queue.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.Commands
+{
+ /// <summary>
+ /// Summary description for Queue.
+ /// </summary>
+ public class Queue : Destination, IQueue
+ {
+ public Queue()
+ : base()
+ {
+ }
+
+ public Queue(String name)
+ : base(name)
+ {
+ }
+
+ override public DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.Queue;
+ }
+ }
+
+ public String QueueName
+ {
+ get { return PhysicalName; }
+ }
+
+ public override int GetDestinationType()
+ {
+ return QUEUE;
+ }
+
+ public override Destination CreateDestination(String name)
+ {
+ return new Queue(name);
+ }
+
+ public override Object Clone()
+ {
+ // Since we are a derived class use the base's Clone()
+ // to perform the shallow copy. Since it is shallow it
+ // will include our derived class. Since we are derived,
+ // this method is an override.
+ Queue o = (Queue) base.Clone();
+
+ // Now do the deep work required
+ // If any new variables are added then this routine will
+ // likely need updating
+
+ return o;
+ }
+ }
+}
+