You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:48 UTC
[11/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client
Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
new file mode 100644
index 0000000..9754783
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ using Cloak;
+ using Factory;
+ using Util;
+ using Util.Types;
+ using Util.Types.Map.AMQP;
+ class AMQPMapMessageCloak : AMQPMessageCloak, IMapMessageCloak
+ {
+ private IPrimitiveMap map = null;
+ private Map amqpmap = null;
+
+
+ internal AMQPMapMessageCloak(Connection conn) : base(conn)
+ {
+ InitializeMapBody();
+ }
+
+ internal AMQPMapMessageCloak(MessageConsumer c, Amqp.Message msg) : base(c, msg)
+ {
+ InitializeMapBody();
+ }
+
+ internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MAP; } }
+
+ private void InitializeMapBody()
+ {
+ if (message.BodySection == null)
+ {
+ amqpmap = new Map();
+ map = new AMQPValueMap(amqpmap);
+ AmqpValue val = new AmqpValue();
+ val.Value = amqpmap;
+ message.BodySection = val;
+ }
+ else
+ {
+ if (message.BodySection is AmqpValue)
+ {
+ object obj = (message.BodySection as AmqpValue).Value;
+ if (obj == null)
+ {
+ amqpmap = new Map();
+ map = new AMQPValueMap(amqpmap);
+ (message.BodySection as AmqpValue).Value = amqpmap;
+ }
+ else if (obj is Map)
+ {
+ amqpmap = obj as Map;
+ map = new AMQPValueMap(amqpmap);
+ }
+ else
+ {
+ throw new NMSException(string.Format("Invalid message body value type. Type: {0}.", obj.GetType().Name));
+ }
+ }
+ else
+ {
+ throw new NMSException("Invalid message body type.");
+ }
+ }
+
+ }
+
+ IPrimitiveMap IMapMessageCloak.Map
+ {
+ get
+ {
+ return map;
+ }
+ }
+
+ IMapMessageCloak IMapMessageCloak.Copy()
+ {
+ IMapMessageCloak copy = new AMQPMapMessageCloak(Connection);
+ CopyInto(copy);
+ return copy;
+ }
+
+ protected override void CopyInto(IMessageCloak msg)
+ {
+ base.CopyInto(msg);
+ IPrimitiveMap copy = (msg as IMapMessageCloak).Map;
+ foreach (string key in this.map.Keys)
+ {
+ object value = map[key];
+ if (value != null)
+ {
+ Type valType = value.GetType();
+ if (valType.IsPrimitive)
+ {
+ // value copy primitive value
+ copy[key] = value;
+ }
+ else if (valType.IsArray && valType.Equals(typeof(byte[])))
+ {
+ // use IPrimitive map SetBytes for most common implementation this is a deep copy.
+ byte[] original = value as byte[];
+ copy.SetBytes(key, original);
+ }
+ else if (valType.Equals(typeof(IDictionary)) || valType.Equals(typeof(Amqp.Types.Map)))
+ {
+ // reference copy
+ copy.SetDictionary(key, value as IDictionary);
+ }
+ else if (valType.Equals(typeof(IList)) || valType.Equals(typeof(Amqp.Types.List)))
+ {
+ // reference copy
+ copy.SetList(key, value as IList);
+ }
+ else
+ {
+ copy[key] = value;
+ }
+ }
+ else
+ {
+ copy[key] = value;
+ }
+
+ }
+ }
+
+ public override string ToString()
+ {
+ string result = base.ToString();
+ if(this.map != null)
+ {
+ result +=string.Format("\nMessage Body: {0}\n", ConversionSupport.ToString(this.map));
+ }
+ return result;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs b/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
new file mode 100644
index 0000000..95f1465
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ using Util;
+ using Cloak;
+
+ class AMQPMessageBuilder
+ {
+ public static IMessage CreateProviderMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IMessage msg = null;
+ msg = CreateFromMessageAnnontations(consumer, message);
+ if(msg == null)
+ {
+ msg = CreateFromMessageBody(consumer, message);
+ }
+ if(msg == null)
+ {
+ throw new NMSException("Could not create NMS Message.");
+ }
+ return msg;
+ }
+
+ private static IMessage CreateFromMessageBody(MessageConsumer consumer, Amqp.Message message)
+ {
+ IMessage msg = null;
+ object body = message.Body;
+ if(body == null)
+ {
+ if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
+ {
+ msg = CreateObjectMessage(consumer, message);
+ }
+ else if (IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
+ {
+ msg = CreateBytesMessage(consumer, message);
+ }
+ else
+ {
+ Symbol contentType = GetContentType(message);
+ if(contentType != null)
+ {
+ msg = CreateTextMessage(consumer, message);
+ }
+ else
+ {
+ msg = CreateMessage(consumer, message);
+ }
+ }
+ }
+ else if (message.BodySection is Data)
+ {
+ if(IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
+ {
+ msg = CreateBytesMessage(consumer, message);
+ }
+ else if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
+ {
+ msg = CreateObjectMessage(consumer, message);
+ }
+ else
+ {
+ Symbol contentType = GetContentType(message);
+ if(contentType != null)
+ {
+ msg = CreateTextMessage(consumer, message);
+ }
+ else
+ {
+ msg = CreateBytesMessage(consumer, message);
+ }
+ }
+ }
+ else if (message.BodySection is AmqpSequence)
+ {
+ msg = CreateObjectMessage(consumer, message);
+ }
+ else if (body is string)
+ {
+ msg = CreateTextMessage(consumer, message);
+ }
+ else if (body is byte[])
+ {
+ msg = CreateBytesMessage(consumer, message);
+ }
+ else
+ {
+ msg = CreateObjectMessage(consumer, message);
+ }
+
+ return msg;
+ }
+
+ private static Symbol GetContentType(Amqp.Message message)
+ {
+ Properties msgProps = message.Properties;
+ if (msgProps == null)
+ {
+ return null;
+ }
+ else
+ {
+ return msgProps.ContentType;
+ }
+ }
+
+ private static bool IsContentType(Symbol type, Amqp.Message message)
+ {
+ Symbol contentType = GetContentType(message);
+ if (contentType == null)
+ {
+ return type == null;
+ }
+ else
+ {
+ return type.Equals(contentType);
+ }
+ }
+
+ private static IMessage CreateFromMessageAnnontations(MessageConsumer consumer, Amqp.Message message)
+ {
+ IMessage msg = null;
+ object objVal = message.MessageAnnotations[SymbolUtil.JMSX_OPT_MSG_TYPE];
+ if(objVal != null && objVal is SByte)
+ {
+ byte type = Convert.ToByte(objVal);
+ switch (type)
+ {
+ case MessageSupport.JMS_TYPE_MSG:
+ msg = CreateMessage(consumer, message);
+ break;
+ case MessageSupport.JMS_TYPE_BYTE:
+ msg = CreateBytesMessage(consumer, message);
+ break;
+ case MessageSupport.JMS_TYPE_TXT:
+ msg = CreateTextMessage(consumer, message);
+ break;
+ case MessageSupport.JMS_TYPE_OBJ:
+ msg = CreateObjectMessage(consumer, message);
+ break;
+ case MessageSupport.JMS_TYPE_STRM:
+ msg = CreateStreamMessage(consumer, message);
+ break;
+ case MessageSupport.JMS_TYPE_MAP:
+ msg = CreateMapMessage(consumer, message);
+ break;
+ default:
+ throw new NMSException("Unsupported Msg Annontation type: " + type);
+ }
+
+ }
+ return msg;
+ }
+
+ private static IMessage CreateMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IMessageCloak cloak = new AMQPMessageCloak(consumer, message);
+ return new Message(cloak);
+ }
+
+ private static IMessage CreateTextMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ ITextMessageCloak cloak = new AMQPTextMessageCloak(consumer, message);
+ return new TextMessage(cloak);
+ }
+
+ private static IMessage CreateStreamMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IStreamMessageCloak cloak = new AMQPStreamMessageCloak(consumer, message);
+ return new StreamMessage(cloak);
+ }
+
+ private static IMessage CreateObjectMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IObjectMessageCloak cloak = new AMQPObjectMessageCloak(consumer, message);
+ return new ObjectMessage(cloak);
+ }
+
+ private static IMessage CreateMapMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IMapMessageCloak cloak = new AMQPMapMessageCloak(consumer, message);
+ return new MapMessage(cloak);
+ }
+
+ private static IMessage CreateBytesMessage(MessageConsumer consumer, Amqp.Message message)
+ {
+ IBytesMessageCloak cloak = new AMQPBytesMessageCloak(consumer, message);
+ return new BytesMessage(cloak);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
new file mode 100644
index 0000000..dde222e
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+using Amqp;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ using Util;
+ using Util.Types.Map.AMQP;
+ using Cloak;
+ using Factory;
+ using System.Reflection;
+
+ class AMQPMessageCloak : IMessageCloak
+ {
+ private TimeSpan timeToLive;
+ private IDestination replyTo;
+ private bool redelivered = false;
+ private string msgId;
+ private IDestination destination;
+ private string correlationId;
+ private IPrimitiveMap properties;
+ private MessagePropertyIntercepter propertyHelper;
+
+ private Header messageHeader = null;
+ private DeliveryAnnotations deliveryAnnontations = null;
+ private MessageAnnotations messageAnnontations = null;
+ private ApplicationProperties applicationProperties = null;
+ private Properties messageProperties = null;
+
+#pragma warning disable CS0414
+ private Footer messageFooter = null;
+#pragma warning restore CS0414
+
+ private byte[] content;
+ private bool readOnlyProperties = false;
+
+ protected Amqp.Message message;
+ protected readonly Connection connection;
+ protected MessageConsumer consumer;
+
+ internal AMQPMessageCloak(Connection c)
+ {
+ message = new Amqp.Message();
+ connection = c;
+ InitMessage();
+ }
+
+ internal AMQPMessageCloak(MessageConsumer c, Amqp.Message msg)
+ {
+ message = msg;
+ consumer = c;
+ connection = c.Session.Connection;
+ InitMessage();
+ InitDeliveryAnnotations();
+ }
+
+
+ #region Internal Properties
+
+ internal Connection Connection { get { return connection; } }
+
+ internal Amqp.Message AMQPMessage { get { return message; } }
+
+ internal virtual byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MSG; } }
+
+ #endregion
+
+ private void InitMessage()
+ {
+ InitMessageHeader();
+ InitMessageProperties();
+ SetMessageAnnotation(SymbolUtil.JMSX_OPT_MSG_TYPE, (sbyte)JMSMessageType);
+ }
+
+ protected virtual void CopyInto(IMessageCloak msg)
+ {
+ MessageTransformation.CopyNMSMessageProperties(this, msg);
+ msg.AckHandler = this.AckHandler;
+ }
+
+ #region Protected Amqp.Message Initialize/Accessor
+
+ protected void InitMessageHeader()
+ {
+ if (this.messageHeader == null && this.message.Header == null)
+ {
+ this.messageHeader = new Header();
+ this.message.Header = this.messageHeader;
+ }
+ else if (this.messageHeader == null && this.message.Header != null)
+ {
+ this.messageHeader = this.message.Header;
+ }
+ else if (this.messageHeader != null && this.message.Header == null)
+ {
+ this.message.Header = this.messageHeader;
+ }
+ }
+
+ protected void InitMessageProperties()
+ {
+ if (this.messageProperties == null && this.message.Properties == null)
+ {
+ this.messageProperties = new Properties();
+ this.message.Properties = this.messageProperties;
+ }
+ else if (this.messageProperties == null && this.message.Properties != null)
+ {
+ this.messageProperties = this.message.Properties;
+ }
+ else if (this.messageProperties != null && this.message.Properties == null)
+ {
+ this.message.Properties = this.messageProperties;
+ }
+ }
+
+ protected void InitApplicationProperties()
+ {
+ if (this.applicationProperties == null && this.message.ApplicationProperties == null)
+ {
+ this.applicationProperties = new ApplicationProperties();
+ this.message.ApplicationProperties = this.applicationProperties;
+ }
+ else if (this.applicationProperties == null && this.message.ApplicationProperties != null)
+ {
+ this.applicationProperties = this.message.ApplicationProperties;
+ }
+ else if (this.applicationProperties != null && this.message.ApplicationProperties == null)
+ {
+ this.message.ApplicationProperties = this.applicationProperties;
+
+ }
+ }
+
+ protected void InitDeliveryAnnotations()
+ {
+ if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations == null)
+ {
+ this.deliveryAnnontations = new DeliveryAnnotations();
+ this.message.DeliveryAnnotations = this.deliveryAnnontations;
+ }
+ else if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations != null)
+ {
+ this.deliveryAnnontations = this.message.DeliveryAnnotations;
+ }
+ else if (this.deliveryAnnontations != null && this.message.DeliveryAnnotations == null)
+ {
+ this.message.DeliveryAnnotations = this.deliveryAnnontations;
+ }
+ }
+
+ protected void InitMessageAnnontations()
+ {
+ if (this.messageAnnontations == null && this.message.MessageAnnotations == null)
+ {
+ this.messageAnnontations = new MessageAnnotations();
+ this.message.MessageAnnotations = messageAnnontations;
+ }
+ else if (this.messageAnnontations == null && this.message.MessageAnnotations != null)
+ {
+ this.messageAnnontations = this.message.MessageAnnotations;
+ }
+ else if (this.messageAnnontations != null && this.message.MessageAnnotations == null)
+ {
+ this.message.MessageAnnotations = this.messageAnnontations;
+ }
+ }
+
+ protected void SetDeliveryAnnotation(Symbol key, object value)
+ {
+ InitDeliveryAnnotations();
+ this.deliveryAnnontations[key] = value;
+ }
+
+ protected void SetMessageAnnotation(Symbol key, object value)
+ {
+ InitMessageAnnontations();
+ messageAnnontations[key] = value;
+ }
+
+ protected object GetMessageAnnotation(Symbol key)
+ {
+ InitMessageAnnontations();
+ return messageAnnontations[key];
+ }
+
+ #endregion
+
+ #region IMessageCloak Properties
+
+ public bool IsReceived { get { return consumer != null; } }
+
+ public virtual byte[] Content
+ {
+ get
+ {
+ return content;
+ }
+
+ set
+ {
+ content = value;
+ }
+ }
+
+ public virtual bool IsBodyReadOnly { get; set; }
+
+ public virtual bool IsPropertiesReadOnly
+ {
+ get
+ {
+ return (this.propertyHelper == null) ? readOnlyProperties : this.propertyHelper.ReadOnly;
+ }
+ set
+ {
+ if (this.propertyHelper != null)
+ this.propertyHelper.ReadOnly = value;
+ readOnlyProperties = value;
+ }
+ }
+
+
+ public string NMSCorrelationID
+ {
+ get
+ {
+ if ( null != this.correlationId)
+ {
+ return this.correlationId;
+ }
+ object objId = this.messageProperties.GetCorrelationId();
+ if (objId != null)
+ {
+ // correlationId strings are returned as-is to the application, otherwise
+ // convert it to a NMSMessageId string
+ if (objId is string)
+ {
+ this.correlationId = objId as string;
+ }
+ else
+ {
+ this.correlationId = MessageSupport.CreateNMSMessageId(objId);
+ }
+ }
+
+ return this.correlationId;
+ }
+ set
+ {
+ object objId = MessageSupport.CreateAMQPMessageId(value);
+ this.messageProperties.SetCorrelationId(objId);
+ this.correlationId = value;
+ }
+ }
+
+ public MsgDeliveryMode NMSDeliveryMode
+ {
+ get
+ {
+ if (this.messageHeader.Durable)
+ {
+ return MsgDeliveryMode.Persistent;
+ }
+ else
+ {
+ return MsgDeliveryMode.NonPersistent;
+ }
+
+ }
+ set
+ {
+ if (value.Equals(MsgDeliveryMode.Persistent))
+ {
+ this.messageHeader.Durable = true;
+ }
+ else
+ {
+ this.messageHeader.Durable = false;
+ }
+ }
+ }
+
+ public IDestination NMSDestination
+ {
+ get
+ {
+ if (destination == null && consumer != null)
+ {
+ object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST);
+ if (typeObj != null)
+ {
+ byte type = Convert.ToByte(typeObj);
+ destination = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type);
+ if(destination == null)
+ {
+ destination = consumer.Destination;
+ }
+ }
+ }
+ return destination;
+ }
+ set
+ {
+ string destString = null;
+ IDestination dest = null;
+ if (value != null) {
+ destString = UriUtil.GetAddress(value, Connection);
+ dest = value;
+ }
+ this.messageProperties.To = destString;
+ SetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST, MessageSupport.GetValueForDestination(dest));
+ destination = dest;
+ }
+ }
+
+ public string NMSMessageId
+ {
+ get
+ {
+ object objId = this.messageProperties.GetMessageId();
+ if (this.msgId == null && objId != null)
+ {
+ this.msgId = MessageSupport.CreateNMSMessageId(objId);
+ }
+ return this.msgId;
+ }
+ set
+ {
+ object msgId = MessageSupport.CreateAMQPMessageId(value);
+ //Tracer.InfoFormat("Set message Id to <{0}>: {1}", msgId.GetType().Name, msgId.ToString());
+ this.messageProperties.SetMessageId(msgId);
+ this.msgId = value;
+ }
+ }
+
+ public MsgPriority NMSPriority
+ {
+ get { return MessageSupport.GetPriorityFromValue(this.messageHeader.Priority); }
+ set
+ {
+ this.messageHeader.Priority = MessageSupport.GetValueForPriority(value);
+ }
+ }
+
+ public bool NMSRedelivered
+ {
+ get
+ {
+ if (this.messageHeader.DeliveryCount > 0)
+ {
+ redelivered = true;
+ }
+ return redelivered;
+ }
+ set { redelivered = value; }
+ }
+
+ public IDestination NMSReplyTo
+ {
+ get
+ {
+ if (replyTo == null && IsReceived)
+ {
+ object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO);
+ if (typeObj != null)
+ {
+ byte type = Convert.ToByte(typeObj);
+ replyTo = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type, true);
+ }
+ }
+ return replyTo;
+ }
+ set
+ {
+ IDestination dest = null;
+ string destString = null;
+ if (value != null)
+ {
+ destString = UriUtil.GetAddress(value, Connection);
+ dest = value;
+ SetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO, MessageSupport.GetValueForDestination(dest));
+ }
+ this.messageProperties.ReplyTo = destString;
+
+ replyTo = dest;
+ }
+ }
+
+ public DateTime NMSTimestamp
+ {
+ get { return messageProperties.CreationTime; }
+ set
+ {
+ messageProperties.CreationTime = value;
+ if (NMSTimeToLive != null && NMSTimeToLive != TimeSpan.Zero)
+ {
+ messageProperties.AbsoluteExpiryTime = value + timeToLive;
+ }
+ }
+ }
+
+ public TimeSpan NMSTimeToLive
+ {
+ get
+ {
+ if ( timeToLive != null)
+ {
+ return timeToLive;
+ }
+ if (messageProperties.AbsoluteExpiryTime == DateTime.MinValue)
+ {
+
+ timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(this.messageHeader.Ttl));
+ return timeToLive;
+ }
+ else
+ {
+ return messageProperties.AbsoluteExpiryTime - NMSTimestamp;
+ }
+ }
+ set
+ {
+ timeToLive = value;
+ }
+ }
+
+ public string NMSType
+ {
+ get { return this.messageProperties.Subject; }
+ set { this.messageProperties.Subject = value; }
+ }
+
+ public IPrimitiveMap Properties
+ {
+ get
+ {
+ if (properties == null)
+ {
+ InitApplicationProperties();
+ properties = new AMQPPrimitiveMap(this.applicationProperties);
+ propertyHelper = new MessagePropertyIntercepter(this, properties, readOnlyProperties);
+ }
+ return propertyHelper;
+ }
+ }
+
+ public int DeliveryCount
+ {
+ get
+ {
+ return Convert.ToInt32(this.messageHeader.DeliveryCount);
+ }
+
+ set
+ {
+ this.messageHeader.DeliveryCount = Convert.ToUInt32(value);
+ }
+ }
+
+ public int RedeliveryCount
+ {
+ get
+ {
+ return DeliveryCount - 1;
+ }
+
+ set
+ {
+ DeliveryCount = value + 1;
+ }
+ }
+
+ public MessageAcknowledgementHandler AckHandler { get; set; }
+
+ public void Acknowledge()
+ {
+ if (AckHandler != null)
+ {
+ if (connection.IsClosed)
+ {
+ throw new IllegalStateException("Can not acknowledge Message on closed connection.");
+ }
+
+ AckHandler.Acknowledge();
+ AckHandler = null;
+ }
+ }
+
+ public virtual void ClearBody()
+ {
+ Content = null;
+ }
+
+ public virtual void ClearProperties()
+ {
+ if (properties != null)
+ {
+ propertyHelper.Clear();
+ }
+ }
+
+ public virtual IMessageCloak Copy()
+ {
+ IMessageCloak copy = null;
+ switch(JMSMessageType)
+ {
+ case MessageSupport.JMS_TYPE_MSG:
+ copy = new AMQPMessageCloak(connection);
+ break;
+ case MessageSupport.JMS_TYPE_BYTE:
+ copy = new AMQPBytesMessageCloak(connection);
+ break;
+ case MessageSupport.JMS_TYPE_TXT:
+ copy = new AMQPTextMessageCloak(connection);
+ break;
+ case MessageSupport.JMS_TYPE_MAP:
+ copy = new AMQPMapMessageCloak(connection);
+ break;
+ case MessageSupport.JMS_TYPE_STRM:
+ copy = new AMQPStreamMessageCloak(connection);
+ break;
+ case MessageSupport.JMS_TYPE_OBJ:
+ copy = new AMQPObjectMessageCloak(connection, (this as AMQPObjectMessageCloak).Type);
+ break;
+ default:
+ throw new NMSException("Fatal error Invalid JMS type.");
+ }
+
+ CopyInto(copy);
+ return copy;
+ }
+
+ public object GetMessageAnnotation(string symbolKey)
+ {
+ Symbol sym = symbolKey;
+ return GetMessageAnnotation(sym);
+ }
+
+ public void SetMessageAnnotation(string symbolKey, object value)
+ {
+ Symbol sym = symbolKey;
+ SetMessageAnnotation(sym, value);
+ }
+
+ public object GetDeliveryAnnotation(string symbolKey)
+ {
+ Symbol sym = symbolKey;
+ return GetDeliveryAnnotation(sym);
+ }
+
+ public void SetDeliveryAnnotation(string symbolKey, object value)
+ {
+ Symbol sym = symbolKey;
+ SetDeliveryAnnotation(sym, value);
+ }
+
+ public string GetContentType()
+ {
+ return GetContentTypeSymbol();
+ }
+
+ public void SetContentType(string type)
+ {
+ SetContentType(new Symbol(type));
+ }
+
+ protected virtual Symbol GetContentTypeSymbol()
+ {
+ return this.messageProperties.ContentType;
+ }
+
+ protected virtual void SetContentType(Symbol type)
+ {
+ this.messageProperties.ContentType = type;
+ }
+
+ #endregion
+
+ public override string ToString()
+ {
+ string result = string.Format("{0}:\n", this.GetType());
+ result += string.Format("inner amqp message: \n{0}\n", AMQPMessageCloak.ToString(message));
+ result += "NMS Fields = [\n";
+ foreach (MemberInfo info in this.GetType().GetMembers())
+ {
+ if (info is PropertyInfo)
+ {
+ PropertyInfo prop = info as PropertyInfo;
+ if (prop.GetGetMethod(true).IsPublic)
+ {
+ try
+ {
+ Object val = prop.GetValue(this, null);
+ if (val is IPrimitiveMap )
+ {
+ result += prop.Name + " = " + ConversionSupport.ToString(val as IPrimitiveMap) +",\n";
+ }
+ else
+ {
+ result += string.Format("{0} = {1},\n", prop.Name, val);
+ }
+ }catch(TargetInvocationException tie)
+ {
+ Tracer.InfoFormat("Failed to invoke Member field accessor: {0}, cause: {1}", prop.Name, tie);
+ }
+ }
+ }
+ }
+ result = result.Substring(0, result.Length - 2) + "\n]";
+ return result;
+ }
+
+ public static string ToString(Amqp.Message message)
+ {
+ if (message == null) return "null";
+ string result = "Type="+ message.GetType().Name +":\n";
+
+ if (message.Header != null)
+ {
+ result += "Message Header: " + message.Header.ToString() + "\n";
+ }
+
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs b/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
new file mode 100644
index 0000000..454b8e8
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Message.Factory;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ class AMQPMessageTransformation <T> : MessageTransformation where T:ConnectionInfo
+ {
+ protected readonly Connection connection;
+ protected readonly MessageFactory<T> factory;
+
+ public AMQPMessageTransformation(AMQPMessageFactory<T> fact) : base()
+ {
+ connection = fact.Parent;
+ factory = fact;
+ }
+
+ protected override IBytesMessage DoCreateBytesMessage()
+ {
+ return factory.CreateBytesMessage();
+ }
+
+ protected override IMapMessage DoCreateMapMessage()
+ {
+ return factory.CreateMapMessage();
+ }
+
+ protected override IMessage DoCreateMessage()
+ {
+ return factory.CreateMessage();
+ }
+
+ protected override IObjectMessage DoCreateObjectMessage()
+ {
+ return factory.CreateObjectMessage(null);
+ }
+
+ protected override IStreamMessage DoCreateStreamMessage()
+ {
+ return factory.CreateStreamMessage();
+ }
+
+ protected override ITextMessage DoCreateTextMessage()
+ {
+ return factory.CreateTextMessage();
+ }
+
+ protected override void DoPostProcessMessage(IMessage message)
+ {
+ // nothing for now
+ }
+
+ protected override IDestination DoTransformDestination(IDestination destination)
+ {
+ return DestinationTransformation.Transform(connection, destination);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
new file mode 100644
index 0000000..49b6a03
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Framing;
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ using Amqp;
+ using Cloak;
+ using Util;
+ using Util.Types;
+
+ class AMQPObjectMessageCloak : AMQPMessageCloak, IObjectMessageCloak
+ {
+
+ public static AMQPObjectEncodingType DEFAULT_ENCODING_TYPE = AMQPObjectEncodingType.AMQP_TYPE;
+
+ private IAMQPObjectSerializer objectSerializer;
+
+ #region Contructors
+ internal AMQPObjectMessageCloak(Apache.NMS.AMQP.Connection c, AMQPObjectEncodingType type) : base(c)
+ {
+ InitializeObjectSerializer(type);
+ Body = null;
+ }
+
+ internal AMQPObjectMessageCloak(MessageConsumer mc, Amqp.Message message) : base(mc, message)
+ {
+ if (message.Properties.ContentType.Equals(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE))
+ {
+ if(message.MessageAnnotations.Map.ContainsKey(MessageSupport.JMS_JAVA_ENCODING)
+ && message.MessageAnnotations.Map[MessageSupport.JMS_JAVA_ENCODING].Equals(SymbolUtil.BOOLEAN_TRUE))
+ {
+ InitializeObjectSerializer(AMQPObjectEncodingType.JAVA_SERIALIZABLE);
+ }
+ else
+ {
+ InitializeObjectSerializer(AMQPObjectEncodingType.DOTNET_SERIALIZABLE);
+ }
+ }
+ else
+ {
+ InitializeObjectSerializer(AMQPObjectEncodingType.AMQP_TYPE);
+ }
+ }
+
+ #endregion
+
+ #region Internal Properties Fields
+ internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_OBJ; } }
+ #endregion
+
+ #region Public IObjectMessageCloak Properties
+ public AMQPObjectEncodingType Type { get { return this.objectSerializer.Type; } }
+
+ public object Body
+ {
+ get
+ {
+ return this.objectSerializer.GetObject();
+ }
+ set
+ {
+ this.objectSerializer.SetObject(value);
+ }
+ }
+
+ public override byte[] Content
+ {
+ get
+ {
+ return null;
+ }
+ set
+ {
+
+ }
+ }
+
+ #endregion
+
+ #region IMessageCloak Copy Methods
+
+ IObjectMessageCloak IObjectMessageCloak.Copy()
+ {
+ IObjectMessageCloak ocloak = new AMQPObjectMessageCloak(connection, this.objectSerializer.Type);
+ this.CopyInto(ocloak);
+ return ocloak;
+ }
+
+
+ protected override void CopyInto(IMessageCloak msg)
+ {
+ base.CopyInto(msg);
+ if (msg is IObjectMessageCloak)
+ {
+ IObjectMessageCloak copy = msg as IObjectMessageCloak;
+ if (copy is AMQPObjectMessageCloak)
+ {
+ this.objectSerializer.CopyInto((copy as AMQPObjectMessageCloak).objectSerializer);
+ }
+ else
+ {
+ this.objectSerializer.SetObject(copy.Body);
+ }
+ }
+
+ }
+
+ #endregion
+
+ #region Private Methods
+
+ private void InitializeObjectSerializer(AMQPObjectEncodingType type)
+ {
+ switch (type)
+ {
+ case AMQPObjectEncodingType.AMQP_TYPE:
+ objectSerializer = new AMQPTypeSerializer(this);
+ break;
+ case AMQPObjectEncodingType.DOTNET_SERIALIZABLE:
+ objectSerializer = new DotnetObjectSerializer(this);
+ break;
+ case AMQPObjectEncodingType.JAVA_SERIALIZABLE:
+ objectSerializer = new JavaObjectSerializer(this);
+ break;
+ default:
+ throw NMSExceptionSupport.Create(new ArgumentException("Unsupported object encoding."));
+ }
+ }
+
+ #endregion
+ }
+
+ #region IAMQPObjectSerializer
+
+ #region IAMQPObjectSerializer Interface
+ internal interface IAMQPObjectSerializer
+ {
+ Amqp.Message Message { get; }
+ void SetObject(object o);
+ object GetObject();
+
+ void CopyInto(IAMQPObjectSerializer serializer);
+
+ AMQPObjectEncodingType Type { get; }
+
+ }
+
+ #endregion
+
+ #region AMQP Type IAMQPObjectSerializer Implementation
+ class AMQPTypeSerializer : IAMQPObjectSerializer
+ {
+
+ private readonly Amqp.Message amqpMessage;
+ private readonly AMQPObjectMessageCloak message;
+ internal AMQPTypeSerializer(AMQPObjectMessageCloak msg)
+ {
+ amqpMessage = msg.AMQPMessage;
+ message = msg;
+ msg.SetMessageAnnotation(MessageSupport.JMS_AMQP_TYPE_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+ }
+
+ public Message Message { get { return amqpMessage; } }
+
+ public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.AMQP_TYPE; } }
+
+ public void CopyInto(IAMQPObjectSerializer serializer)
+ {
+ serializer.SetObject(GetObject());
+ }
+
+ public object GetObject()
+ {
+ RestrictedDescribed body = amqpMessage.BodySection;
+ if(body == null)
+ {
+ return null;
+ }
+ else if (body is AmqpValue)
+ {
+ AmqpValue value = body as AmqpValue;
+ return value.Value;
+ }
+ else if (body is Data)
+ {
+ return (body as Data).Binary;
+ }
+ else if (body is AmqpSequence)
+ {
+ return (body as AmqpSequence).List;
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected body type: " + body.GetType().Name);
+ }
+ }
+
+ public void SetObject(object o)
+ {
+ if(o == null)
+ {
+ amqpMessage.BodySection = MessageSupport.NULL_AMQP_VALUE_BODY;
+ }
+ else if (IsNMSObjectTypeSupported(o))
+ {
+ object value = null;
+ if(o is IList)
+ {
+ value = ConversionSupport.ListToAmqp(o as IList);
+ }
+ else if (o is IPrimitiveMap)
+ {
+ value = ConversionSupport.NMSMapToAmqp(o as IPrimitiveMap);
+ }
+ else
+ {
+ value = o;
+ }
+ // to copy the object being set encode a message then decode and take body
+ Amqp.Message copy = new Amqp.Message(value);
+ ByteBuffer buffer = copy.Encode();
+ copy = Message.Decode(buffer);
+
+ amqpMessage.BodySection = new AmqpValue { Value = copy.Body };
+ }
+ else
+ {
+ throw new ArgumentException("Encoding unexpected object type: " + o.GetType().Name);
+ }
+ }
+
+ private bool IsNMSObjectTypeSupported(object o)
+ {
+ return ConversionSupport.IsNMSType(o) || o is List || o is Map || o is IPrimitiveMap || o is IList;
+ }
+ }
+
+ #endregion
+
+ #region Dotnet Serializable IAMQPObjectSerializer Implementation
+
+ class DotnetObjectSerializer : IAMQPObjectSerializer
+ {
+ private readonly Amqp.Message amqpMessage;
+ private readonly AMQPObjectMessageCloak message;
+ internal DotnetObjectSerializer(AMQPObjectMessageCloak msg)
+ {
+ amqpMessage = msg.AMQPMessage;
+ message = msg;
+ msg.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ msg.SetMessageAnnotation(MessageSupport.JMS_DONET_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+ }
+
+ public Message Message { get { return amqpMessage; } }
+
+ public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.DOTNET_SERIALIZABLE; } }
+
+ public void CopyInto(IAMQPObjectSerializer serializer)
+ {
+ serializer.SetObject(GetObject());
+ }
+
+ public object GetObject()
+ {
+ byte[] bin = null;
+ if(Message.BodySection == null)
+ {
+ return null;
+ }
+ else if (Message.BodySection is Data)
+ {
+ Data data = Message.BodySection as Data;
+ bin = data.Binary;
+ }
+ // TODO handle other body types.
+
+ if (bin == null || bin.Length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return GetDeserializedObject(bin);
+ }
+
+ }
+
+ public void SetObject(object o)
+ {
+
+ byte[] bin = GetSerializedObject(o);
+ if(bin == null || bin.Length == 0)
+ {
+ amqpMessage.BodySection = MessageSupport.EMPTY_DATA;
+ }
+ else
+ {
+ amqpMessage.BodySection = new Data() { Binary = bin };
+ }
+
+ }
+
+ private object GetDeserializedObject(byte[] binary)
+ {
+ object result = null;
+
+ MemoryStream stream = null;
+ IFormatter formatter = null;
+ try
+ {
+ stream = new MemoryStream(binary);
+ formatter = new BinaryFormatter();
+ result = formatter.Deserialize(stream);
+ }
+ finally
+ {
+ stream?.Close();
+ }
+
+
+ return result;
+
+ }
+
+ private byte[] GetSerializedObject(object o)
+ {
+ if (o == null) return new byte[] { 0xac,0xed,0x00,0x05,0x70 };
+ MemoryStream stream = null;
+ IFormatter formatter = null;
+ byte[] result = null;
+ try
+ {
+ stream = new MemoryStream();
+ formatter = new BinaryFormatter();
+ formatter.Serialize(stream, o);
+ result = stream.ToArray();
+ }
+ finally
+ {
+ if(stream!= null)
+ {
+ stream.Close();
+ }
+ }
+
+ return result;
+ }
+ }
+
+ #endregion
+
+ #region Java Serializable IAMQPObjectSerializer Implementation
+
+ class JavaObjectSerializer : IAMQPObjectSerializer
+ {
+ private readonly Amqp.Message amqpMessage;
+ private readonly AMQPObjectMessageCloak message;
+ internal JavaObjectSerializer(AMQPObjectMessageCloak msg)
+ {
+ amqpMessage = msg.AMQPMessage;
+ message = msg;
+ message.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ message.SetMessageAnnotation(MessageSupport.JMS_JAVA_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+ }
+
+ public Message Message { get { return amqpMessage; } }
+
+ public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.JAVA_SERIALIZABLE; } }
+
+ public void CopyInto(IAMQPObjectSerializer serializer)
+ {
+ // TODO fix to copy java serialized object as binary.
+ serializer.SetObject(GetObject());
+ }
+
+ public object GetObject()
+ {
+ throw new NotImplementedException("Java Serialized Object body Not Supported.");
+ }
+
+ public void SetObject(object o)
+ {
+ throw new NotImplementedException("Java Serialized Object body Not Supported.");
+ }
+ }
+
+ #endregion // Java IAMQPObjectSerializer Impl
+
+ #endregion // IAMQPObjectSerializer
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
new file mode 100644
index 0000000..27dad32
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Framing;
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+
+ using Cloak;
+ using Util;
+ using Factory;
+ class AMQPStreamMessageCloak : AMQPMessageCloak, IStreamMessageCloak
+ {
+ private IList list;
+ private int position = 0;
+ internal AMQPStreamMessageCloak(Connection c):base(c)
+ {
+ list = InitializeEmptyBody(true);
+ }
+
+ internal AMQPStreamMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg)
+ {
+ if (msg.BodySection == null)
+ {
+ list = InitializeEmptyBody(true);
+ }
+ else if (msg.BodySection is AmqpSequence)
+ {
+ IList value = (msg.BodySection as AmqpSequence).List;
+ if(value == null)
+ {
+ list = InitializeEmptyBody(true);
+ }
+ else
+ {
+ list = value;
+ }
+ }
+ else if (msg.BodySection is AmqpValue)
+ {
+ object value = (msg.BodySection as AmqpValue).Value;
+ if(value == null)
+ {
+ list = InitializeEmptyBody(false);
+ }
+ else if (value is IList)
+ {
+ list = value as IList;
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected amqp-value body content type: " + value.GetType().Name);
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected message body type: " + msg.BodySection.GetType().Name);
+ }
+ }
+
+ internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_STRM; } }
+
+ #region Private Methods
+
+ private List InitializeEmptyBody(bool isSequence)
+ {
+ List l = new List();
+ if (isSequence)
+ {
+ AmqpSequence seq = new Amqp.Framing.AmqpSequence();
+ message.BodySection = seq;
+ seq.List = l;
+
+ }
+ else
+ {
+ Amqp.Framing.AmqpValue val = new Amqp.Framing.AmqpValue();
+ val.Value = l;
+ message.BodySection = val;
+ }
+ return l;
+ }
+
+ private bool IsEmpty { get { return list.Count <= 0; } }
+
+ #endregion
+
+ #region IStreamMessageCloak Methods
+ public bool HasNext { get { return !IsEmpty && position < list.Count; } }
+
+ public object Peek()
+ {
+ if(IsEmpty || position >= list.Count)
+ {
+ throw new EndOfStreamException("Attempt to read past the end of stream");
+ }
+ object value = list[position];
+ if(value != null && value is byte[])
+ {
+ byte[] binary = value as byte[];
+ byte[] bin = new byte[binary.Length];
+ binary.CopyTo(bin, 0);
+ value = bin;
+ }
+ return value;
+ }
+
+ public void Pop()
+ {
+ if (IsEmpty || position > list.Count)
+ {
+ throw new EndOfStreamException("Attempt to read past the end of stream");
+ }
+ position++;
+ }
+
+ public void Put(object value)
+ {
+ object entry = value;
+ if (entry != null && entry is byte[])
+ {
+ byte[] bin = new byte[(entry as byte[]).Length];
+ (entry as byte[]).CopyTo(bin, 0);
+ entry = bin;
+ }
+ if (list.Add(entry) < 0)
+ {
+ throw NMSExceptionSupport.Create(string.Format("Failed to add {0} to stream.", entry.ToString()), null);
+ }
+ position++;
+ }
+
+ public void Reset()
+ {
+ position = 0;
+ }
+
+ public override void ClearBody()
+ {
+ base.ClearBody();
+ list.Clear();
+ position = 0;
+ }
+
+ IStreamMessageCloak IStreamMessageCloak.Copy()
+ {
+ return base.Copy() as IStreamMessageCloak;
+ }
+
+ protected override void CopyInto(IMessageCloak msg)
+ {
+ base.CopyInto(msg);
+ if(msg is IStreamMessageCloak)
+ {
+ IStreamMessageCloak copy = (msg as IStreamMessageCloak);
+
+ foreach(object o in list)
+ {
+ copy.Put(o);
+ }
+ }
+ }
+ public override string ToString()
+ {
+ string result = base.ToString();
+ result += "\nMessage Body: {";
+ foreach(object o in list)
+ {
+ //
+ // handle byte arrays for now
+ // add more special handlers as needed.
+ //
+ if (o is byte[])
+ {
+
+ result += string.Format("\n{0} len={1}: {2}", o.GetType(), (o as byte[]).Length, BitConverter.ToString(o as byte[]).Replace("-", " "));
+ }
+ else
+ {
+ result += string.Format("\n{0}: {1}", o.GetType(), o.ToString());
+ }
+ }
+ result += "\n}";
+ return result;
+ }
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
new file mode 100644
index 0000000..dd093bc
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+ using Cloak;
+ using Util;
+ using Factory;
+
+ class AMQPTextMessageCloak : AMQPMessageCloak, ITextMessageCloak
+ {
+ #region Constructor
+
+ internal AMQPTextMessageCloak(Connection c) : base(c) {}
+
+ internal AMQPTextMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg) {}
+
+ #endregion
+
+ internal override byte JMSMessageType
+ {
+ get
+ {
+ return MessageSupport.JMS_TYPE_TXT;
+ }
+ }
+
+ public string Text
+ {
+ get
+ {
+ return GetTextFromBody();
+ }
+
+ set
+ {
+ AmqpValue val = new AmqpValue();
+ val.Value = value;
+ this.message.BodySection = val;
+ }
+ }
+
+ ITextMessageCloak ITextMessageCloak.Copy()
+ {
+ ITextMessageCloak tcloak = new AMQPTextMessageCloak(connection);
+ CopyInto(tcloak);
+ return tcloak;
+ }
+
+ protected override void CopyInto(IMessageCloak msg)
+ {
+ base.CopyInto(msg);
+ (msg as ITextMessageCloak).Text = Text;
+ }
+
+ private static string DecodeBinaryBody(byte[] body)
+ {
+ string result = string.Empty;
+ if(body != null && body.Length > 0)
+ {
+ result = Encoding.UTF8.GetString(body);
+ }
+ return result;
+ }
+
+ private string GetTextFromBody()
+ {
+ string result = string.Empty;
+ RestrictedDescribed body = this.message.BodySection;
+ if(body == null)
+ {
+ return result;
+ }
+ else if (body is Data)
+ {
+ byte[] data = (body as Data).Binary;
+ result = DecodeBinaryBody(data);
+ }
+ else if(body is AmqpValue)
+ {
+ object value = (body as AmqpValue).Value;
+ if(value == null)
+ {
+ return result;
+ }
+ else if (value is byte[])
+ {
+ result = DecodeBinaryBody(value as byte[]);
+ }
+ else if (value is string)
+ {
+ result = value as string;
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
+ }
+
+
+ return result;
+ }
+ public override string ToString()
+ {
+ string result = base.ToString();
+ if (this.Text != null)
+ {
+ result += string.Format("\nMessage Body: {0}\n", Text);
+ }
+ return result;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/BytesMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/BytesMessage.cs b/src/main/csharp/Message/BytesMessage.cs
new file mode 100644
index 0000000..8186214
--- /dev/null
+++ b/src/main/csharp/Message/BytesMessage.cs
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+ using Cloak;
+ /// <summary>
+ /// Apache.NMS.AMQP.Message.BytesMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IBytesMessage interface.
+ /// Apache.NMS.AMQP.Message.BytesMessage uses the Apache.NMS.AMQP.Message.Cloak.IBytesMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+ /// </summary>
+ class BytesMessage : Message, IBytesMessage
+ {
+ private BinaryWriter dataOut = null;
+ private BinaryReader dataIn = null;
+ private MemoryStream outputBuffer = null;
+ private readonly new IBytesMessageCloak cloak;
+
+ #region Constructor
+
+ internal BytesMessage(IBytesMessageCloak message) : base(message)
+ {
+ cloak = message;
+ }
+
+ #endregion
+
+ internal override Message Copy()
+ {
+ return new BytesMessage(this.cloak.Copy());
+ }
+
+ #region Private Methods
+
+ private void InitializeReadingMode()
+ {
+ FailIfWriteOnlyMsgBody();
+ if(dataIn == null || dataIn.BaseStream == null)
+ {
+ dataIn = cloak.getDataReader();
+ }
+ }
+
+ private void InitializeWritingMode()
+ {
+ FailIfReadOnlyMsgBody();
+ if(dataOut == null )
+ {
+ dataOut = cloak.getDataWriter();
+ }
+ }
+
+ private void StoreContent()
+ {
+ if(dataOut != null)
+ {
+ dataOut.Close();
+ base.Content = outputBuffer.ToArray();
+
+ dataOut = null;
+ outputBuffer = null;
+ }
+ }
+
+ #endregion
+
+ #region IBytesMessage Properties
+
+ public override byte[] Content
+ {
+ get
+ {
+ byte[] buffer = null;
+ InitializeReadingMode();
+ if(this.cloak.BodyLength != 0)
+ {
+ buffer = new byte[this.cloak.BodyLength];
+ dataIn.Read(buffer, 0, buffer.Length);
+ }
+ return buffer;
+ }
+ set
+ {
+ InitializeWritingMode();
+ if(value != null)
+ {
+ this.dataOut.Write(value, 0, value.Length);
+ }
+ }
+ }
+
+ public long BodyLength
+ {
+ get
+ {
+ InitializeReadingMode();
+ return this.cloak.BodyLength;
+ }
+ }
+
+ #endregion
+
+ #region IBytesMessage Methods
+
+ public override void ClearBody()
+ {
+ dataIn = null;
+ dataOut = null;
+ outputBuffer = null;
+ IsReadOnly = false;
+ base.ClearBody();
+ }
+
+ public void Reset()
+ {
+ dataIn = null;
+ dataOut = null;
+ outputBuffer = null;
+ this.cloak.Reset();
+ IsReadOnly = true;
+ }
+
+ public bool ReadBoolean()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadBoolean();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public byte ReadByte()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadByte();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes(byte[] value)
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.Read(value, 0, value.Length);
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadBytes(byte[] value, int length)
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.Read(value, 0, length);
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public char ReadChar()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadChar();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public double ReadDouble()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadDouble();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public short ReadInt16()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadInt16();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public int ReadInt32()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadInt32();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public long ReadInt64()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadInt64();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public float ReadSingle()
+ {
+ InitializeReadingMode();
+ try
+ {
+ return dataIn.ReadSingle();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public string ReadString()
+ {
+ InitializeReadingMode();
+ try
+ {
+ // Note if dataIn is an EndianBinaryReader the string length is read as 16bit short
+ return dataIn.ReadString();
+ }
+ catch (EndOfStreamException e)
+ {
+ throw NMSExceptionSupport.CreateMessageEOFException(e);
+ }
+ catch (IOException e)
+ {
+ throw NMSExceptionSupport.CreateMessageFormatException(e);
+ }
+ }
+
+ public void WriteBoolean(bool value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteByte(byte value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch(Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+
+ }
+
+ public void WriteBytes(byte[] value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value, 0, value.Length);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteBytes(byte[] value, int offset, int length)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value, offset, length);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteChar(char value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteDouble(double value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt16(short value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt32(int value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteInt64(long value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteObject(object value)
+ {
+ InitializeWritingMode();
+
+ Type objType = value.GetType();
+ if(value is byte[])
+ {
+ dataOut.Write((byte[])value);
+ }
+ else if (objType.IsPrimitive)
+ {
+ if(value is Byte)
+ {
+ dataOut.Write((byte)value);
+ }
+ else if (value is Char)
+ {
+ dataOut.Write((char)value);
+ }
+ else if (value is Boolean)
+ {
+ dataOut.Write((bool)value);
+ }
+ else if (value is Int16)
+ {
+ dataOut.Write((short)value);
+ }
+ else if (value is Int32)
+ {
+ dataOut.Write((int)value);
+ }
+ else if (value is Int64)
+ {
+ dataOut.Write((long)value);
+ }
+ else if (value is Single)
+ {
+ dataOut.Write((float)value);
+ }
+ else if (value is Double)
+ {
+ dataOut.Write((double)value);
+ }
+ else if (value is String)
+ {
+ dataOut.Write((string) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write primitive type:" + objType);
+ }
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot write non-primitive type:" + objType);
+ }
+
+ }
+
+ public void WriteSingle(float value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public void WriteString(string value)
+ {
+ InitializeWritingMode();
+ try
+ {
+ // note if dataOut is an EndianBinaryWriter, strings are written with a 16bit short length.
+ dataOut.Write(value);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs b/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
new file mode 100644
index 0000000..118b5ab
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ internal interface IBytesMessageCloak : IMessageCloak
+ {
+ BinaryReader getDataReader();
+ BinaryWriter getDataWriter();
+
+ new IBytesMessageCloak Copy();
+
+ int BodyLength { get; }
+ void Reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IMapMessageCloak.cs b/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
new file mode 100644
index 0000000..acb649a
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ interface IMapMessageCloak : IMessageCloak
+ {
+ IPrimitiveMap Map { get; }
+ new IMapMessageCloak Copy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IMessageCloak.cs b/src/main/csharp/Message/Cloak/IMessageCloak.cs
new file mode 100644
index 0000000..623449c
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IMessageCloak.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ /// <summary>
+ /// Provider specific Cloak Interface from provider implementation.
+ /// </summary>
+ interface IMessageCloak : IMessage
+ {
+ byte[] Content
+ {
+ get;
+ set;
+ }
+
+ bool IsBodyReadOnly { get; set; }
+
+ bool IsPropertiesReadOnly { get; set; }
+
+ bool IsReceived { get; }
+
+ IMessageCloak Copy();
+
+ object GetMessageAnnotation(string symbolKey);
+
+ void SetMessageAnnotation(string symbolKey, object value);
+
+ object GetDeliveryAnnotation(string symbolKey);
+
+ void SetDeliveryAnnotation(string symbolKey, object value);
+
+ int DeliveryCount { get; set; }
+
+ int RedeliveryCount { get; set; }
+
+ MessageAcknowledgementHandler AckHandler { get; set; }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs b/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
new file mode 100644
index 0000000..fcf4c97
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Runtime.Serialization;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ internal enum AMQPObjectEncodingType
+ {
+ UNKOWN = -1,
+ AMQP_TYPE = 0,
+ DOTNET_SERIALIZABLE = 1,
+ JAVA_SERIALIZABLE = 2,
+ }
+ interface IObjectMessageCloak : IMessageCloak
+ {
+ new IObjectMessageCloak Copy();
+ object Body { get; set; }
+
+ AMQPObjectEncodingType Type { get; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs b/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
new file mode 100644
index 0000000..8775d1b
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+ interface IStreamMessageCloak : IMessageCloak
+ {
+
+
+ new IStreamMessageCloak Copy();
+
+ bool HasNext { get; }
+
+ void Reset();
+
+ void Put(object value);
+
+ object Peek();
+
+ void Pop();
+
+
+ }
+}