You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:48 UTC

[11/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
new file mode 100644
index 0000000..9754783
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMapMessageCloak.cs
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    using Cloak;
+    using Factory;
+    using Util;
+    using Util.Types;
+    using Util.Types.Map.AMQP;
+    class AMQPMapMessageCloak : AMQPMessageCloak, IMapMessageCloak
+    {
+        private IPrimitiveMap map = null;
+        private Map amqpmap = null;
+        
+
+        internal AMQPMapMessageCloak(Connection conn) : base(conn)
+        {
+            InitializeMapBody();
+        }
+
+        internal AMQPMapMessageCloak(MessageConsumer c, Amqp.Message msg) : base(c, msg)
+        {
+            InitializeMapBody();
+        }
+
+        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MAP; } }
+
+        private void InitializeMapBody()
+        {
+            if (message.BodySection == null)
+            {
+                amqpmap = new Map();
+                map = new AMQPValueMap(amqpmap);
+                AmqpValue val = new AmqpValue();
+                val.Value = amqpmap;
+                message.BodySection = val;
+            }
+            else 
+            {
+                if (message.BodySection is AmqpValue)
+                {
+                    object obj = (message.BodySection as AmqpValue).Value;
+                    if (obj == null)
+                    {
+                        amqpmap = new Map();
+                        map = new AMQPValueMap(amqpmap);
+                        (message.BodySection as AmqpValue).Value = amqpmap;
+                    }
+                    else if (obj is Map)
+                    {
+                        amqpmap = obj as Map;
+                        map = new AMQPValueMap(amqpmap);
+                    }
+                    else
+                    {
+                        throw new NMSException(string.Format("Invalid message body value type. Type: {0}.", obj.GetType().Name));
+                    }
+                }
+                else
+                {
+                    throw new NMSException("Invalid message body type.");
+                }
+            }
+            
+        }
+
+        IPrimitiveMap IMapMessageCloak.Map
+        {
+            get
+            {
+                return map;
+            }
+        }
+
+        IMapMessageCloak IMapMessageCloak.Copy()
+        {
+            IMapMessageCloak copy = new AMQPMapMessageCloak(Connection);
+            CopyInto(copy);
+            return copy;
+        }
+
+        protected override void CopyInto(IMessageCloak msg)
+        {
+            base.CopyInto(msg);
+            IPrimitiveMap copy = (msg as IMapMessageCloak).Map;
+            foreach (string key in this.map.Keys)
+            {
+                object value = map[key];
+                if (value != null)
+                {
+                    Type valType = value.GetType();
+                    if (valType.IsPrimitive)
+                    {
+                        // value copy primitive value
+                        copy[key] = value;
+                    }
+                    else if (valType.IsArray && valType.Equals(typeof(byte[])))
+                    {
+                        // use IPrimitive map SetBytes for most common implementation this is a deep copy.
+                        byte[] original = value as byte[];
+                        copy.SetBytes(key, original);
+                    }
+                    else if (valType.Equals(typeof(IDictionary)) || valType.Equals(typeof(Amqp.Types.Map)))
+                    {
+                        // reference copy
+                        copy.SetDictionary(key, value as IDictionary);
+                    }
+                    else if (valType.Equals(typeof(IList)) || valType.Equals(typeof(Amqp.Types.List)))
+                    {
+                        // reference copy
+                        copy.SetList(key, value as IList);
+                    }
+                    else
+                    {
+                        copy[key] = value;
+                    }
+                }
+                else
+                {
+                    copy[key] = value;
+                }
+                
+            }
+        }
+
+        public override string ToString()
+        {
+            string result = base.ToString();
+            if(this.map != null)
+            {
+                result +=string.Format("\nMessage Body: {0}\n", ConversionSupport.ToString(this.map));
+            }
+            return result;
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs b/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
new file mode 100644
index 0000000..95f1465
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageBuilder.cs
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    using Util;
+    using Cloak;
+    
+    class AMQPMessageBuilder
+    {
+        public static IMessage CreateProviderMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IMessage msg = null;
+            msg = CreateFromMessageAnnontations(consumer, message);
+            if(msg == null)
+            {
+                msg = CreateFromMessageBody(consumer, message);
+            }
+            if(msg == null)
+            {
+                throw new NMSException("Could not create NMS Message.");
+            }
+            return msg;
+        }
+
+        private static IMessage CreateFromMessageBody(MessageConsumer consumer, Amqp.Message message)
+        {
+            IMessage msg = null;
+            object body = message.Body;
+            if(body == null)
+            {
+                if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
+                {
+                    msg = CreateObjectMessage(consumer, message);
+                }
+                else if (IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
+                {
+                    msg = CreateBytesMessage(consumer, message);
+                }
+                else
+                {
+                    Symbol contentType = GetContentType(message);
+                    if(contentType != null)
+                    {
+                        msg = CreateTextMessage(consumer, message);
+                    }
+                    else
+                    {
+                        msg = CreateMessage(consumer, message);
+                    }
+                }
+            }
+            else if (message.BodySection is Data)
+            {
+                if(IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
+                {
+                    msg = CreateBytesMessage(consumer, message);
+                }
+                else if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
+                {
+                    msg = CreateObjectMessage(consumer, message);
+                }
+                else
+                {
+                    Symbol contentType = GetContentType(message);
+                    if(contentType != null)
+                    {
+                        msg = CreateTextMessage(consumer, message);
+                    }
+                    else
+                    {
+                        msg = CreateBytesMessage(consumer, message);
+                    }
+                }
+            }
+            else if (message.BodySection is AmqpSequence)
+            {
+                msg = CreateObjectMessage(consumer, message);
+            }
+            else if (body is string)
+            {
+                msg = CreateTextMessage(consumer, message);
+            }
+            else if (body is byte[])
+            {
+                msg = CreateBytesMessage(consumer, message);
+            }
+            else
+            {
+                msg = CreateObjectMessage(consumer, message);
+            }
+
+            return msg;
+        }
+
+        private static Symbol GetContentType(Amqp.Message message)
+        {
+            Properties msgProps = message.Properties;
+            if (msgProps == null)
+            {
+                return null;
+            }
+            else
+            {
+                return msgProps.ContentType;
+            }
+        }
+
+        private static bool IsContentType(Symbol type, Amqp.Message message)
+        {
+            Symbol contentType = GetContentType(message);
+            if (contentType == null)
+            {
+                return type == null;
+            }
+            else
+            {
+                return type.Equals(contentType);
+            }
+        }
+
+        private static IMessage CreateFromMessageAnnontations(MessageConsumer consumer, Amqp.Message message)
+        {
+            IMessage msg = null;
+            object objVal = message.MessageAnnotations[SymbolUtil.JMSX_OPT_MSG_TYPE];
+            if(objVal != null && objVal is SByte)
+            {
+                byte type = Convert.ToByte(objVal);
+                switch (type)
+                {
+                    case MessageSupport.JMS_TYPE_MSG:
+                        msg = CreateMessage(consumer, message);
+                        break;
+                    case MessageSupport.JMS_TYPE_BYTE:
+                        msg = CreateBytesMessage(consumer, message);
+                        break;
+                    case MessageSupport.JMS_TYPE_TXT:
+                        msg = CreateTextMessage(consumer, message);
+                        break;
+                    case MessageSupport.JMS_TYPE_OBJ:
+                        msg = CreateObjectMessage(consumer, message);
+                        break;
+                    case MessageSupport.JMS_TYPE_STRM:
+                        msg = CreateStreamMessage(consumer, message);
+                        break;
+                    case MessageSupport.JMS_TYPE_MAP:
+                        msg = CreateMapMessage(consumer, message);
+                        break;
+                    default:
+                        throw new NMSException("Unsupported Msg Annontation type: " + type);
+                }
+                
+            }
+            return msg;
+        }
+
+        private static IMessage CreateMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IMessageCloak cloak = new AMQPMessageCloak(consumer, message);
+            return new Message(cloak);
+        }
+
+        private static IMessage CreateTextMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            ITextMessageCloak cloak = new AMQPTextMessageCloak(consumer, message);
+            return new TextMessage(cloak);
+        }
+
+        private static IMessage CreateStreamMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IStreamMessageCloak cloak = new AMQPStreamMessageCloak(consumer, message);
+            return new StreamMessage(cloak);
+        }
+
+        private static IMessage CreateObjectMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IObjectMessageCloak cloak = new AMQPObjectMessageCloak(consumer, message);
+            return new ObjectMessage(cloak);
+        }
+
+        private static IMessage CreateMapMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IMapMessageCloak cloak = new AMQPMapMessageCloak(consumer, message);
+            return new MapMessage(cloak);
+        }
+
+        private static IMessage CreateBytesMessage(MessageConsumer consumer, Amqp.Message message)
+        {
+            IBytesMessageCloak cloak = new AMQPBytesMessageCloak(consumer, message);
+            return new BytesMessage(cloak);
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
new file mode 100644
index 0000000..dde222e
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageCloak.cs
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+using Amqp;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    using Util;
+    using Util.Types.Map.AMQP;
+    using Cloak;
+    using Factory;
+    using System.Reflection;
+
+    class AMQPMessageCloak : IMessageCloak
+    {
+        private TimeSpan timeToLive;
+        private IDestination replyTo;
+        private bool redelivered = false;
+        private string msgId;
+        private IDestination destination;
+        private string correlationId;
+        private IPrimitiveMap properties;
+        private MessagePropertyIntercepter propertyHelper;
+
+        private Header messageHeader = null;
+        private DeliveryAnnotations deliveryAnnontations = null;
+        private MessageAnnotations messageAnnontations = null;
+        private ApplicationProperties applicationProperties = null;
+        private Properties messageProperties = null;
+
+#pragma warning disable CS0414
+        private Footer messageFooter = null;
+#pragma warning restore CS0414
+
+        private byte[] content;
+        private bool readOnlyProperties = false;
+
+        protected Amqp.Message message;
+        protected readonly Connection connection;
+        protected MessageConsumer consumer;
+
+        internal AMQPMessageCloak(Connection c)
+        {
+            message = new Amqp.Message();
+            connection = c;
+            InitMessage();
+        }
+
+        internal AMQPMessageCloak(MessageConsumer c, Amqp.Message msg)
+        {
+            message = msg;
+            consumer = c;
+            connection = c.Session.Connection;
+            InitMessage();
+            InitDeliveryAnnotations();
+        }
+
+
+        #region Internal Properties
+
+        internal Connection Connection { get { return connection; } }
+
+        internal Amqp.Message AMQPMessage { get { return message; } }
+
+        internal virtual byte JMSMessageType { get { return MessageSupport.JMS_TYPE_MSG; } }
+
+        #endregion
+
+        private void InitMessage()
+        {
+            InitMessageHeader();
+            InitMessageProperties();
+            SetMessageAnnotation(SymbolUtil.JMSX_OPT_MSG_TYPE, (sbyte)JMSMessageType);
+        }
+
+        protected virtual void CopyInto(IMessageCloak msg)
+        {
+            MessageTransformation.CopyNMSMessageProperties(this, msg);
+            msg.AckHandler = this.AckHandler;
+        }
+
+        #region Protected Amqp.Message Initialize/Accessor
+
+        protected void InitMessageHeader()
+        {
+            if (this.messageHeader == null && this.message.Header == null)
+            {
+                this.messageHeader = new Header();
+                this.message.Header = this.messageHeader;
+            }
+            else if (this.messageHeader == null && this.message.Header != null)
+            {
+                this.messageHeader = this.message.Header;
+            }
+            else if (this.messageHeader != null && this.message.Header == null)
+            {
+                this.message.Header = this.messageHeader;
+            }
+        }
+
+        protected void InitMessageProperties()
+        {
+            if (this.messageProperties == null && this.message.Properties == null)
+            {
+                this.messageProperties = new Properties();
+                this.message.Properties = this.messageProperties;
+            }
+            else if (this.messageProperties == null && this.message.Properties != null)
+            {
+                this.messageProperties = this.message.Properties;
+            }
+            else if (this.messageProperties != null && this.message.Properties == null)
+            {
+                this.message.Properties = this.messageProperties;
+            }
+        }
+
+        protected void InitApplicationProperties()
+        {
+            if (this.applicationProperties == null && this.message.ApplicationProperties == null)
+            {
+                this.applicationProperties = new ApplicationProperties();
+                this.message.ApplicationProperties = this.applicationProperties;
+            }
+            else if (this.applicationProperties == null && this.message.ApplicationProperties != null)
+            {
+                this.applicationProperties = this.message.ApplicationProperties;
+            }
+            else if (this.applicationProperties != null && this.message.ApplicationProperties == null)
+            {
+                this.message.ApplicationProperties = this.applicationProperties;
+
+            }
+        }
+
+        protected void InitDeliveryAnnotations()
+        {
+            if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations == null)
+            {
+                this.deliveryAnnontations = new DeliveryAnnotations();
+                this.message.DeliveryAnnotations = this.deliveryAnnontations;
+            }
+            else if (this.deliveryAnnontations == null && this.message.DeliveryAnnotations != null)
+            {
+                this.deliveryAnnontations = this.message.DeliveryAnnotations;
+            }
+            else if (this.deliveryAnnontations != null && this.message.DeliveryAnnotations == null)
+            {
+                this.message.DeliveryAnnotations = this.deliveryAnnontations;
+            }
+        }
+
+        protected void InitMessageAnnontations()
+        {
+            if (this.messageAnnontations == null && this.message.MessageAnnotations == null)
+            {
+                this.messageAnnontations = new MessageAnnotations();
+                this.message.MessageAnnotations = messageAnnontations;
+            }
+            else if (this.messageAnnontations == null && this.message.MessageAnnotations != null)
+            {
+                this.messageAnnontations = this.message.MessageAnnotations;
+            }
+            else if (this.messageAnnontations != null && this.message.MessageAnnotations == null)
+            {
+                this.message.MessageAnnotations = this.messageAnnontations;
+            }
+        }
+
+        protected void SetDeliveryAnnotation(Symbol key, object value)
+        {
+            InitDeliveryAnnotations();
+            this.deliveryAnnontations[key] = value;
+        }
+
+        protected void SetMessageAnnotation(Symbol key, object value)
+        {
+            InitMessageAnnontations();
+            messageAnnontations[key] = value;
+        }
+
+        protected object GetMessageAnnotation(Symbol key)
+        {
+            InitMessageAnnontations();
+            return messageAnnontations[key];
+        }
+
+        #endregion
+
+        #region IMessageCloak Properties
+
+        public bool IsReceived { get { return consumer != null; } }
+
+        public virtual byte[] Content
+        {
+            get
+            {
+                return content;
+            }
+
+            set
+            {
+                content = value;
+            }
+        }
+
+        public virtual bool IsBodyReadOnly { get; set; }
+
+        public virtual bool IsPropertiesReadOnly
+        {
+            get
+            {
+                return (this.propertyHelper == null) ? readOnlyProperties : this.propertyHelper.ReadOnly;
+            }
+            set
+            {
+                if (this.propertyHelper != null)
+                    this.propertyHelper.ReadOnly = value;
+                readOnlyProperties = value;
+            }
+        }
+
+
+        public string NMSCorrelationID
+        {
+            get
+            {
+                if ( null != this.correlationId)
+                {
+                    return this.correlationId;
+                }
+                object objId = this.messageProperties.GetCorrelationId();
+                if (objId != null)
+                {
+                    // correlationId strings are returned as-is to the application, otherwise
+                    // convert it to a NMSMessageId string 
+                    if (objId is string)
+                    {
+                        this.correlationId = objId as string;
+                    }
+                    else
+                    {
+                        this.correlationId = MessageSupport.CreateNMSMessageId(objId);
+                    }
+                }
+
+                return this.correlationId;
+            }
+            set
+            {
+                object objId = MessageSupport.CreateAMQPMessageId(value);
+                this.messageProperties.SetCorrelationId(objId);
+                this.correlationId = value;
+            }
+        }
+
+        public MsgDeliveryMode NMSDeliveryMode
+        {
+            get
+            {
+                if (this.messageHeader.Durable)
+                {
+                    return MsgDeliveryMode.Persistent;
+                }
+                else
+                {
+                    return MsgDeliveryMode.NonPersistent;
+                }
+
+            }
+            set
+            {
+                if (value.Equals(MsgDeliveryMode.Persistent))
+                {
+                    this.messageHeader.Durable = true;
+                }
+                else
+                {
+                    this.messageHeader.Durable = false;
+                }
+            }
+        }
+
+        public IDestination NMSDestination
+        {
+            get
+            {
+                if (destination == null && consumer != null)
+                {
+                    object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST);
+                    if (typeObj != null)
+                    {
+                        byte type = Convert.ToByte(typeObj);
+                        destination = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type);
+                        if(destination == null)
+                        {
+                            destination = consumer.Destination;
+                        }
+                    }
+                }
+                return destination;
+            }
+            set
+            {
+                string destString = null;
+                IDestination dest = null;
+                if (value != null) {
+                    destString = UriUtil.GetAddress(value, Connection);
+                    dest = value;
+                }
+                this.messageProperties.To = destString;
+                SetMessageAnnotation(SymbolUtil.JMSX_OPT_DEST, MessageSupport.GetValueForDestination(dest));
+                destination = dest;
+            }
+        }
+
+        public string NMSMessageId
+        {
+            get
+            {
+                object objId = this.messageProperties.GetMessageId();
+                if (this.msgId == null && objId != null)
+                {
+                    this.msgId = MessageSupport.CreateNMSMessageId(objId);
+                }
+                return this.msgId;
+            }
+            set
+            {
+                object msgId = MessageSupport.CreateAMQPMessageId(value);
+                //Tracer.InfoFormat("Set message Id to <{0}>: {1}", msgId.GetType().Name, msgId.ToString());
+                this.messageProperties.SetMessageId(msgId);
+                this.msgId = value;
+            }
+        }
+
+        public MsgPriority NMSPriority
+        {
+            get { return MessageSupport.GetPriorityFromValue(this.messageHeader.Priority); }
+            set
+            {
+                this.messageHeader.Priority = MessageSupport.GetValueForPriority(value);
+            }
+        }
+
+        public bool NMSRedelivered
+        {
+            get
+            {
+                if (this.messageHeader.DeliveryCount > 0)
+                {
+                    redelivered = true;
+                }
+                return redelivered;
+            }
+            set { redelivered = value; }
+        }
+
+        public IDestination NMSReplyTo
+        {
+            get
+            {
+                if (replyTo == null && IsReceived)
+                {
+                    object typeObj = GetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO);
+                    if (typeObj != null)
+                    {
+                        byte type = Convert.ToByte(typeObj);
+                        replyTo = MessageSupport.CreateDestinationFromMessage(Connection, messageProperties, type, true);
+                    }
+                }
+                return replyTo;
+            }
+            set
+            {
+                IDestination dest = null;
+                string destString = null;
+                if (value != null)
+                {
+                    destString = UriUtil.GetAddress(value, Connection);
+                    dest = value;
+                    SetMessageAnnotation(SymbolUtil.JMSX_OPT_REPLY_TO, MessageSupport.GetValueForDestination(dest));
+                }
+                this.messageProperties.ReplyTo = destString;
+
+                replyTo = dest;
+            }
+        }
+
+        public DateTime NMSTimestamp
+        {
+            get { return messageProperties.CreationTime; }
+            set
+            {
+                messageProperties.CreationTime = value;
+                if (NMSTimeToLive != null && NMSTimeToLive != TimeSpan.Zero)
+                {
+                    messageProperties.AbsoluteExpiryTime = value + timeToLive;
+                }
+            }
+        }
+
+        public TimeSpan NMSTimeToLive
+        {
+            get
+            {
+                if ( timeToLive != null)
+                {
+                    return timeToLive;
+                }
+                if (messageProperties.AbsoluteExpiryTime == DateTime.MinValue)
+                {
+
+                    timeToLive = TimeSpan.FromMilliseconds(Convert.ToDouble(this.messageHeader.Ttl));
+                    return timeToLive;
+                }
+                else
+                {
+                    return messageProperties.AbsoluteExpiryTime - NMSTimestamp;
+                }
+            }
+            set
+            {
+                timeToLive = value;
+            }
+        }
+
+        public string NMSType
+        {
+            get { return this.messageProperties.Subject; }
+            set { this.messageProperties.Subject = value; }
+        }
+
+        public IPrimitiveMap Properties
+        {
+            get
+            {
+                if (properties == null)
+                {
+                    InitApplicationProperties();
+                    properties = new AMQPPrimitiveMap(this.applicationProperties);
+                    propertyHelper = new MessagePropertyIntercepter(this, properties, readOnlyProperties);
+                }
+                return propertyHelper;
+            }
+        }
+
+        public int DeliveryCount
+        {
+            get
+            {
+                return Convert.ToInt32(this.messageHeader.DeliveryCount);
+            }
+
+            set
+            {
+                this.messageHeader.DeliveryCount = Convert.ToUInt32(value);
+            }
+        }
+
+        public int RedeliveryCount
+        {
+            get
+            {
+                return DeliveryCount - 1;
+            }
+
+            set
+            {
+                DeliveryCount = value + 1;
+            }
+        }
+
+        public MessageAcknowledgementHandler AckHandler { get; set; }
+        
+        public void Acknowledge()
+        {
+            if (AckHandler != null)
+            {
+                if (connection.IsClosed)
+                {
+                    throw new IllegalStateException("Can not acknowledge Message on closed connection.");
+                }
+            
+                AckHandler.Acknowledge();
+                AckHandler = null;
+            }
+        }
+
+        public virtual void ClearBody()
+        {
+            Content = null;
+        }
+
+        public virtual void ClearProperties()
+        {
+            if (properties != null)
+            {
+                propertyHelper.Clear();
+            }
+        }
+        
+        public virtual IMessageCloak Copy()
+        {
+            IMessageCloak copy = null;
+            switch(JMSMessageType)
+            {
+                case MessageSupport.JMS_TYPE_MSG:
+                    copy = new AMQPMessageCloak(connection);
+                    break;
+                case MessageSupport.JMS_TYPE_BYTE:
+                    copy = new AMQPBytesMessageCloak(connection);
+                    break;
+                case MessageSupport.JMS_TYPE_TXT:
+                    copy = new AMQPTextMessageCloak(connection);
+                    break;
+                case MessageSupport.JMS_TYPE_MAP:
+                    copy = new AMQPMapMessageCloak(connection);
+                    break;
+                case MessageSupport.JMS_TYPE_STRM:
+                    copy = new AMQPStreamMessageCloak(connection);
+                    break;
+                case MessageSupport.JMS_TYPE_OBJ:
+                    copy = new AMQPObjectMessageCloak(connection, (this as AMQPObjectMessageCloak).Type);
+                    break;
+                default:
+                    throw new NMSException("Fatal error Invalid JMS type.");
+            }
+            
+            CopyInto(copy);
+            return copy;
+        }
+
+        public object GetMessageAnnotation(string symbolKey)
+        {
+            Symbol sym = symbolKey;
+            return GetMessageAnnotation(sym);
+        }
+
+        public void SetMessageAnnotation(string symbolKey, object value)
+        {
+            Symbol sym = symbolKey;
+            SetMessageAnnotation(sym, value);
+        }
+
+        public object GetDeliveryAnnotation(string symbolKey)
+        {
+            Symbol sym = symbolKey;
+            return GetDeliveryAnnotation(sym);
+        }
+
+        public void SetDeliveryAnnotation(string symbolKey, object value)
+        {
+            Symbol sym = symbolKey;
+            SetDeliveryAnnotation(sym, value);
+        }
+
+        public string GetContentType()
+        {
+            return GetContentTypeSymbol();
+        }
+
+        public void SetContentType(string type)
+        {
+            SetContentType(new Symbol(type));
+        }
+
+        protected virtual Symbol GetContentTypeSymbol()
+        {
+            return this.messageProperties.ContentType;
+        }
+
+        protected virtual void SetContentType(Symbol type)
+        {
+            this.messageProperties.ContentType = type;
+        }
+        
+        #endregion
+
+        public override string ToString()
+        {
+            string result = string.Format("{0}:\n", this.GetType());
+            result += string.Format("inner amqp message: \n{0}\n", AMQPMessageCloak.ToString(message));
+            result += "NMS Fields = [\n";
+            foreach (MemberInfo info in this.GetType().GetMembers())
+            {
+                if (info is PropertyInfo)
+                {
+                    PropertyInfo prop = info as PropertyInfo;
+                    if (prop.GetGetMethod(true).IsPublic)
+                    {
+                        try
+                        {
+                            Object val = prop.GetValue(this, null);
+                            if (val is IPrimitiveMap )
+                            {
+                                result += prop.Name + " = " + ConversionSupport.ToString(val as IPrimitiveMap) +",\n";
+                            }
+                            else
+                            {
+                                result += string.Format("{0} = {1},\n", prop.Name, val);
+                            }
+                        }catch(TargetInvocationException tie)
+                        {
+                            Tracer.InfoFormat("Failed to invoke Member field accessor: {0}, cause: {1}", prop.Name, tie);
+                        }
+                    }
+                }
+            }
+            result = result.Substring(0, result.Length - 2) + "\n]";
+            return result;
+        }
+
+        public static string ToString(Amqp.Message message)
+        {
+            if (message == null) return "null";
+            string result = "Type="+ message.GetType().Name +":\n";
+            
+            if (message.Header != null)
+            {
+                result += "Message Header: " + message.Header.ToString() + "\n";
+            }
+            
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs b/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
new file mode 100644
index 0000000..454b8e8
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPMessageTransformation.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Message.Factory;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    class AMQPMessageTransformation <T> : MessageTransformation where T:ConnectionInfo
+    {
+        protected readonly Connection connection;
+        protected readonly MessageFactory<T> factory;
+
+        public AMQPMessageTransformation(AMQPMessageFactory<T> fact) : base()
+        {
+            connection = fact.Parent;    
+            factory = fact;
+        }
+
+        protected override IBytesMessage DoCreateBytesMessage()
+        {
+            return factory.CreateBytesMessage();
+        }
+
+        protected override IMapMessage DoCreateMapMessage()
+        {
+            return factory.CreateMapMessage();
+        }
+
+        protected override IMessage DoCreateMessage()
+        {
+            return factory.CreateMessage();
+        }
+
+        protected override IObjectMessage DoCreateObjectMessage()
+        {
+            return factory.CreateObjectMessage(null);
+        }
+
+        protected override IStreamMessage DoCreateStreamMessage()
+        {
+            return factory.CreateStreamMessage();
+        }
+
+        protected override ITextMessage DoCreateTextMessage()
+        {
+            return factory.CreateTextMessage();
+        }
+
+        protected override void DoPostProcessMessage(IMessage message)
+        {
+            // nothing for now
+        }
+
+        protected override IDestination DoTransformDestination(IDestination destination)
+        {
+            return DestinationTransformation.Transform(connection, destination);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
new file mode 100644
index 0000000..49b6a03
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPObjectMessageCloak.cs
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Runtime.Serialization;
+using System.Runtime.Serialization.Formatters.Binary;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Framing;
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    using Amqp;
+    using Cloak;
+    using Util;
+    using Util.Types;
+    
+    class AMQPObjectMessageCloak : AMQPMessageCloak, IObjectMessageCloak
+    {
+
+        public static AMQPObjectEncodingType DEFAULT_ENCODING_TYPE = AMQPObjectEncodingType.AMQP_TYPE;
+
+        private IAMQPObjectSerializer objectSerializer;
+
+        #region Contructors
+        internal AMQPObjectMessageCloak(Apache.NMS.AMQP.Connection c, AMQPObjectEncodingType type) : base(c)
+        {
+            InitializeObjectSerializer(type);
+            Body = null;
+        }
+
+        internal AMQPObjectMessageCloak(MessageConsumer mc, Amqp.Message message) : base(mc, message)
+        {
+            if (message.Properties.ContentType.Equals(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE))
+            {
+                if(message.MessageAnnotations.Map.ContainsKey(MessageSupport.JMS_JAVA_ENCODING) 
+                    && message.MessageAnnotations.Map[MessageSupport.JMS_JAVA_ENCODING].Equals(SymbolUtil.BOOLEAN_TRUE))
+                {
+                    InitializeObjectSerializer(AMQPObjectEncodingType.JAVA_SERIALIZABLE);
+                }
+                else
+                {
+                    InitializeObjectSerializer(AMQPObjectEncodingType.DOTNET_SERIALIZABLE);
+                }
+            }
+            else
+            {
+                InitializeObjectSerializer(AMQPObjectEncodingType.AMQP_TYPE);
+            }
+        }
+
+        #endregion
+
+        #region Internal Properties Fields
+        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_OBJ; } }
+        #endregion
+
+        #region Public IObjectMessageCloak Properties
+        public AMQPObjectEncodingType Type { get { return this.objectSerializer.Type; } }
+
+        public object Body
+        {
+            get
+            {
+                return this.objectSerializer.GetObject();
+            }
+            set
+            {
+                this.objectSerializer.SetObject(value);
+            }
+        }
+
+        public override byte[] Content
+        {
+            get
+            {
+                return null;
+            }
+            set
+            {
+                
+            }
+        }
+
+        #endregion
+
+        #region IMessageCloak Copy Methods
+
+        IObjectMessageCloak IObjectMessageCloak.Copy()
+        {
+            IObjectMessageCloak ocloak = new AMQPObjectMessageCloak(connection, this.objectSerializer.Type);
+            this.CopyInto(ocloak);
+            return ocloak;
+        }
+        
+
+        protected override void CopyInto(IMessageCloak msg)
+        {
+            base.CopyInto(msg);
+            if (msg is IObjectMessageCloak)
+            {
+                IObjectMessageCloak copy = msg as IObjectMessageCloak;
+                if (copy is AMQPObjectMessageCloak)
+                {
+                    this.objectSerializer.CopyInto((copy as AMQPObjectMessageCloak).objectSerializer);
+                }
+                else
+                {
+                    this.objectSerializer.SetObject(copy.Body);
+                }
+            }
+            
+        }
+
+        #endregion
+
+        #region Private Methods
+
+        private void InitializeObjectSerializer(AMQPObjectEncodingType type)
+        {
+            switch (type)
+            {
+                case AMQPObjectEncodingType.AMQP_TYPE:
+                    objectSerializer = new AMQPTypeSerializer(this);
+                    break;
+                case AMQPObjectEncodingType.DOTNET_SERIALIZABLE:
+                    objectSerializer = new DotnetObjectSerializer(this);
+                    break;
+                case AMQPObjectEncodingType.JAVA_SERIALIZABLE:
+                    objectSerializer = new JavaObjectSerializer(this);
+                    break;
+                default:
+                    throw NMSExceptionSupport.Create(new ArgumentException("Unsupported object encoding."));
+            }
+        }
+
+        #endregion
+    }
+
+    #region IAMQPObjectSerializer
+
+    #region IAMQPObjectSerializer Interface
+    internal interface IAMQPObjectSerializer
+    {
+        Amqp.Message Message { get; }
+        void SetObject(object o);
+        object GetObject();
+
+        void CopyInto(IAMQPObjectSerializer serializer);
+
+        AMQPObjectEncodingType Type { get; }
+
+    }
+
+    #endregion
+
+    #region AMQP Type IAMQPObjectSerializer Implementation
+    class AMQPTypeSerializer : IAMQPObjectSerializer
+    {
+        
+        private readonly Amqp.Message amqpMessage;
+        private readonly AMQPObjectMessageCloak message;
+        internal AMQPTypeSerializer(AMQPObjectMessageCloak msg)
+        {
+            amqpMessage = msg.AMQPMessage;
+            message = msg;
+            msg.SetMessageAnnotation(MessageSupport.JMS_AMQP_TYPE_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+        }
+
+        public Message Message { get { return amqpMessage; } }
+
+        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.AMQP_TYPE; } }
+
+        public void CopyInto(IAMQPObjectSerializer serializer)
+        {
+            serializer.SetObject(GetObject());
+        }
+
+        public object GetObject()
+        {
+            RestrictedDescribed body = amqpMessage.BodySection;
+            if(body == null)
+            {
+                return null;
+            }
+            else if (body is AmqpValue)
+            {
+                AmqpValue value = body as AmqpValue;
+                return value.Value;
+            }
+            else if (body is Data)
+            {
+                return (body as Data).Binary;
+            }
+            else if (body is AmqpSequence)
+            {
+                return (body as AmqpSequence).List;
+            }
+            else
+            {
+                throw new IllegalStateException("Unexpected body type: " + body.GetType().Name);
+            }
+        }
+
+        public void SetObject(object o)
+        {
+            if(o == null)
+            {
+                amqpMessage.BodySection = MessageSupport.NULL_AMQP_VALUE_BODY;
+            }
+            else if (IsNMSObjectTypeSupported(o))
+            {
+                object value = null;
+                if(o is IList)
+                {
+                    value = ConversionSupport.ListToAmqp(o as IList);
+                }
+                else if (o is IPrimitiveMap)
+                {
+                    value = ConversionSupport.NMSMapToAmqp(o as IPrimitiveMap);
+                }
+                else
+                {
+                    value = o;
+                }
+                // to copy the object being set encode a message then decode and take body
+                Amqp.Message copy = new Amqp.Message(value);
+                ByteBuffer buffer = copy.Encode();
+                copy = Message.Decode(buffer);
+                
+                amqpMessage.BodySection = new AmqpValue { Value = copy.Body };
+            }
+            else
+            {
+                throw new ArgumentException("Encoding unexpected object type: " + o.GetType().Name);
+            }
+        }
+
+        private bool IsNMSObjectTypeSupported(object o)
+        {
+            return ConversionSupport.IsNMSType(o) || o is List || o is Map || o is IPrimitiveMap || o is IList;
+        }
+    }
+
+    #endregion
+
+    #region Dotnet Serializable IAMQPObjectSerializer Implementation
+
+    class DotnetObjectSerializer : IAMQPObjectSerializer
+    {
+        private readonly Amqp.Message amqpMessage;
+        private readonly AMQPObjectMessageCloak message;
+        internal DotnetObjectSerializer(AMQPObjectMessageCloak msg)
+        {
+            amqpMessage = msg.AMQPMessage;
+            message = msg;
+            msg.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            msg.SetMessageAnnotation(MessageSupport.JMS_DONET_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+        }
+
+        public Message Message { get { return amqpMessage; } }
+
+        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.DOTNET_SERIALIZABLE; } }
+
+        public void CopyInto(IAMQPObjectSerializer serializer)
+        {
+            serializer.SetObject(GetObject());
+        }
+
+        public object GetObject()
+        {
+            byte[] bin = null;
+            if(Message.BodySection == null)
+            {
+                return null;
+            }
+            else if (Message.BodySection is Data)
+            {
+                Data data = Message.BodySection as Data;
+                bin = data.Binary;
+            }
+            // TODO handle other body types.
+
+            if (bin == null || bin.Length == 0)
+            {
+                return null;
+            }
+            else
+            {
+                return GetDeserializedObject(bin);
+            }
+            
+        }
+
+        public void SetObject(object o)
+        {
+            
+            byte[] bin = GetSerializedObject(o);
+            if(bin == null || bin.Length == 0)
+            {
+                amqpMessage.BodySection = MessageSupport.EMPTY_DATA;
+            }
+            else
+            {
+                amqpMessage.BodySection = new Data() { Binary = bin };
+            }
+            
+        }
+
+        private object GetDeserializedObject(byte[] binary)
+        {
+            object result = null;
+
+            MemoryStream stream = null;
+            IFormatter formatter = null;
+            try
+            {
+                stream = new MemoryStream(binary);
+                formatter = new BinaryFormatter();
+                result = formatter.Deserialize(stream);
+            }
+            finally
+            {
+                stream?.Close();
+            }
+
+
+            return result;
+
+        }
+
+        private byte[] GetSerializedObject(object o)
+        {
+            if (o == null) return new byte[] { 0xac,0xed,0x00,0x05,0x70 };
+            MemoryStream stream = null;
+            IFormatter formatter = null;
+            byte[] result = null;
+            try
+            {
+                stream = new MemoryStream();
+                formatter = new BinaryFormatter();
+                formatter.Serialize(stream, o);
+                result = stream.ToArray();
+            }
+            finally
+            {
+                if(stream!= null)
+                {
+                    stream.Close();
+                }
+            }
+            
+            return result;
+        }
+    }
+
+    #endregion
+
+    #region Java Serializable IAMQPObjectSerializer Implementation
+
+    class JavaObjectSerializer : IAMQPObjectSerializer
+    {
+        private readonly Amqp.Message amqpMessage;
+        private readonly AMQPObjectMessageCloak message;
+        internal JavaObjectSerializer(AMQPObjectMessageCloak msg)
+        {
+            amqpMessage = msg.AMQPMessage;
+            message = msg;
+            message.SetContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+            message.SetMessageAnnotation(MessageSupport.JMS_JAVA_ENCODING, SymbolUtil.BOOLEAN_TRUE);
+        }
+
+        public Message Message { get { return amqpMessage; } }
+
+        public AMQPObjectEncodingType Type { get { return AMQPObjectEncodingType.JAVA_SERIALIZABLE; } }
+
+        public void CopyInto(IAMQPObjectSerializer serializer)
+        {
+            // TODO fix to copy java serialized object as binary.
+            serializer.SetObject(GetObject());
+        }
+
+        public object GetObject()
+        {
+            throw new NotImplementedException("Java Serialized Object body Not Supported.");
+        }
+
+        public void SetObject(object o)
+        {
+            throw new NotImplementedException("Java Serialized Object body Not Supported.");
+        }
+    }
+
+    #endregion // Java IAMQPObjectSerializer Impl
+
+    #endregion // IAMQPObjectSerializer
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
new file mode 100644
index 0000000..27dad32
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPStreamMessageCloak.cs
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Framing;
+using Amqp.Types;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+
+    using Cloak;
+    using Util;
+    using Factory;
+    class AMQPStreamMessageCloak : AMQPMessageCloak, IStreamMessageCloak
+    {
+        private IList list;
+        private int position = 0;
+        internal AMQPStreamMessageCloak(Connection c):base(c)
+        {
+            list = InitializeEmptyBody(true);
+        }
+
+        internal AMQPStreamMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg)
+        {
+            if (msg.BodySection == null)
+            {
+                list = InitializeEmptyBody(true);
+            }
+            else if (msg.BodySection is AmqpSequence)
+            {
+                IList value = (msg.BodySection as AmqpSequence).List;
+                if(value == null)
+                {
+                    list = InitializeEmptyBody(true);
+                }
+                else
+                {
+                    list = value;
+                }
+            }
+            else if (msg.BodySection is AmqpValue)
+            {
+                object value = (msg.BodySection as AmqpValue).Value;
+                if(value == null)
+                {
+                    list = InitializeEmptyBody(false);
+                }
+                else if (value is IList)
+                {
+                    list = value as IList;
+                }
+                else
+                {
+                    throw new IllegalStateException("Unexpected amqp-value body content type: " + value.GetType().Name);
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("Unexpected message body type: " + msg.BodySection.GetType().Name);
+            }
+        }
+
+        internal override byte JMSMessageType { get { return MessageSupport.JMS_TYPE_STRM; } }
+
+        #region Private Methods
+
+        private List InitializeEmptyBody(bool isSequence)
+        {
+            List l = new List();
+            if (isSequence)
+            {
+                AmqpSequence seq = new Amqp.Framing.AmqpSequence();
+                message.BodySection = seq;
+                seq.List = l;
+                
+            }
+            else
+            {
+                Amqp.Framing.AmqpValue val = new Amqp.Framing.AmqpValue();
+                val.Value = l;
+                message.BodySection = val;
+            }
+            return l;
+        }
+
+        private bool IsEmpty { get { return list.Count <= 0; } }
+
+        #endregion
+
+        #region IStreamMessageCloak Methods
+        public bool HasNext { get { return !IsEmpty && position < list.Count; } }
+
+        public object Peek()
+        {
+            if(IsEmpty || position >= list.Count)
+            {
+                throw new EndOfStreamException("Attempt to read past the end of stream");
+            }
+            object value = list[position];
+            if(value != null && value is byte[])
+            {
+                byte[] binary = value as byte[];
+                byte[] bin = new byte[binary.Length];
+                binary.CopyTo(bin, 0);
+                value = bin;
+            }
+            return value;
+        }
+
+        public void Pop()
+        {
+            if (IsEmpty || position > list.Count)
+            {
+                throw new EndOfStreamException("Attempt to read past the end of stream");
+            }
+            position++;
+        }
+
+        public void Put(object value)
+        {
+            object entry = value;
+            if (entry != null && entry is byte[])
+            {
+                byte[] bin = new byte[(entry as byte[]).Length];
+                (entry as byte[]).CopyTo(bin, 0);
+                entry = bin;
+            }
+            if (list.Add(entry) < 0)
+            {
+                throw NMSExceptionSupport.Create(string.Format("Failed to add {0} to stream.", entry.ToString()), null);
+            }
+            position++;
+        }
+
+        public void Reset()
+        {
+            position = 0;
+        }
+
+        public override void ClearBody()
+        {
+            base.ClearBody();
+            list.Clear();
+            position = 0;
+        }
+
+        IStreamMessageCloak IStreamMessageCloak.Copy()
+        {
+            return base.Copy() as IStreamMessageCloak;
+        }
+
+        protected override void CopyInto(IMessageCloak msg)
+        {
+            base.CopyInto(msg);
+            if(msg is IStreamMessageCloak)
+            {
+                IStreamMessageCloak copy = (msg as IStreamMessageCloak);
+                
+                foreach(object o in list)
+                {
+                    copy.Put(o);
+                }
+            }
+        }
+        public override string ToString()
+        {
+            string result = base.ToString();
+            result += "\nMessage Body: {";
+            foreach(object o in list)
+            {
+                //
+                // handle byte arrays for now
+                // add more special handlers as needed.
+                //
+                if (o is byte[])
+                {
+
+                    result += string.Format("\n{0} len={1}: {2}", o.GetType(), (o as byte[]).Length, BitConverter.ToString(o as byte[]).Replace("-", " "));
+                }
+                else
+                {
+                    result += string.Format("\n{0}: {1}", o.GetType(), o.ToString());
+                }
+            }
+            result += "\n}";
+            return result;
+        }
+        #endregion
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs b/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
new file mode 100644
index 0000000..dd093bc
--- /dev/null
+++ b/src/main/csharp/Message/AMQP/AMQPTextMessageCloak.cs
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Amqp.Types;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP.Message.AMQP
+{
+    using Cloak;
+    using Util;
+    using Factory;
+
+    class AMQPTextMessageCloak : AMQPMessageCloak, ITextMessageCloak
+    {
+        #region Constructor
+
+        internal AMQPTextMessageCloak(Connection c) : base(c) {}
+
+        internal AMQPTextMessageCloak(MessageConsumer mc, Amqp.Message msg) : base(mc, msg) {}
+
+        #endregion
+
+        internal override byte JMSMessageType
+        {
+            get
+            {
+                return MessageSupport.JMS_TYPE_TXT;
+            }
+        }
+
+        public string Text
+        {
+            get
+            {
+                return GetTextFromBody();
+            }
+
+            set
+            {
+                AmqpValue val = new AmqpValue();
+                val.Value = value;
+                this.message.BodySection = val;
+            }
+        }
+        
+        ITextMessageCloak ITextMessageCloak.Copy()
+        {
+            ITextMessageCloak tcloak = new AMQPTextMessageCloak(connection);
+            CopyInto(tcloak);
+            return tcloak;
+        }
+        
+        protected override void CopyInto(IMessageCloak msg)
+        {
+            base.CopyInto(msg);
+            (msg as ITextMessageCloak).Text = Text;
+        }
+
+        private static string DecodeBinaryBody(byte[] body)
+        {
+            string result = string.Empty;
+            if(body != null && body.Length > 0)
+            {
+                result = Encoding.UTF8.GetString(body);
+            }
+            return result;
+        }
+
+        private string GetTextFromBody()
+        {
+            string result = string.Empty;
+            RestrictedDescribed body = this.message.BodySection;
+            if(body == null)
+            {
+                return result;
+            }
+            else if (body is Data)
+            {
+                byte[] data = (body as Data).Binary;
+                result = DecodeBinaryBody(data);
+            }
+            else if(body is AmqpValue)
+            {
+                object value = (body as AmqpValue).Value;
+                if(value == null)
+                {
+                    return result;
+                }
+                else if (value is byte[])
+                {
+                    result = DecodeBinaryBody(value as byte[]);
+                }
+                else if (value is string)
+                {
+                    result = value as string;
+                }
+                else
+                {
+                    throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
+                }
+            }
+            else
+            {
+                throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
+            }
+
+
+            return result;
+        }
+        public override string ToString()
+        {
+            string result = base.ToString();
+            if (this.Text != null)
+            {
+                result += string.Format("\nMessage Body: {0}\n", Text);
+            }
+            return result;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/BytesMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/BytesMessage.cs b/src/main/csharp/Message/BytesMessage.cs
new file mode 100644
index 0000000..8186214
--- /dev/null
+++ b/src/main/csharp/Message/BytesMessage.cs
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+    using Cloak;
+    /// <summary>
+    /// Apache.NMS.AMQP.Message.BytesMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IBytesMessage interface.
+    /// Apache.NMS.AMQP.Message.BytesMessage uses the Apache.NMS.AMQP.Message.Cloak.IBytesMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+    /// </summary>
+    class BytesMessage : Message, IBytesMessage
+    {
+        private BinaryWriter dataOut = null;
+        private BinaryReader dataIn = null;
+        private MemoryStream outputBuffer = null;
+        private readonly new IBytesMessageCloak cloak;
+
+        #region Constructor
+
+        internal BytesMessage(IBytesMessageCloak message) : base(message)
+        {
+            cloak = message;
+        }
+
+        #endregion
+
+        internal override Message Copy()
+        {
+            return new BytesMessage(this.cloak.Copy());
+        }
+
+        #region Private Methods
+
+        private void InitializeReadingMode()
+        {
+            FailIfWriteOnlyMsgBody();
+            if(dataIn == null || dataIn.BaseStream == null)
+            {
+                dataIn = cloak.getDataReader();
+            }
+        }
+
+        private void InitializeWritingMode()
+        {
+            FailIfReadOnlyMsgBody();
+            if(dataOut == null )
+            {
+                dataOut = cloak.getDataWriter();
+            }
+        }
+
+        private void StoreContent()
+        {
+            if(dataOut != null)
+            {
+                dataOut.Close();
+                base.Content = outputBuffer.ToArray();
+
+                dataOut = null;
+                outputBuffer = null;
+            }
+        }
+
+        #endregion
+
+        #region IBytesMessage Properties
+
+        public override byte[] Content
+        {
+            get
+            {
+                byte[] buffer = null;
+                InitializeReadingMode();
+                if(this.cloak.BodyLength != 0)
+                {
+                    buffer = new byte[this.cloak.BodyLength];
+                    dataIn.Read(buffer, 0, buffer.Length);
+                }
+                return buffer;
+            }
+            set
+            {
+                InitializeWritingMode();
+                if(value != null)
+                {
+                    this.dataOut.Write(value, 0, value.Length);
+                }    
+            }
+        }
+
+        public long BodyLength
+        {
+            get
+            {
+                InitializeReadingMode();
+                return this.cloak.BodyLength;
+            }
+        }
+
+        #endregion
+
+        #region IBytesMessage Methods
+
+        public override void ClearBody()
+        {
+            dataIn = null;
+            dataOut = null;
+            outputBuffer = null;
+            IsReadOnly = false;
+            base.ClearBody();
+        }
+
+        public void Reset()
+        {
+            dataIn = null;
+            dataOut = null;
+            outputBuffer = null;
+            this.cloak.Reset();
+            IsReadOnly = true;
+        }
+
+        public bool ReadBoolean()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadBoolean();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public byte ReadByte()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadByte();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadBytes(byte[] value)
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.Read(value, 0, value.Length);
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadBytes(byte[] value, int length)
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.Read(value, 0, length);
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public char ReadChar()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadChar();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public double ReadDouble()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadDouble();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public short ReadInt16()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadInt16();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public int ReadInt32()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadInt32();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public long ReadInt64()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadInt64();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public float ReadSingle()
+        {
+            InitializeReadingMode();
+            try
+            {
+                return dataIn.ReadSingle();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+
+        public string ReadString()
+        {
+            InitializeReadingMode();
+            try
+            {
+                // Note if dataIn is an EndianBinaryReader the string length is read as 16bit short
+                return dataIn.ReadString();
+            }
+            catch (EndOfStreamException e)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(e);
+            }
+            catch (IOException e)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(e);
+            }
+        }
+        
+        public void WriteBoolean(bool value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteByte(byte value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch(Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+
+        }
+
+        public void WriteBytes(byte[] value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value, 0, value.Length);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value, offset, length);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteChar(char value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteDouble(double value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt16(short value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt32(int value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteInt64(long value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteObject(object value)
+        {
+            InitializeWritingMode();
+            
+            Type objType = value.GetType();
+            if(value is byte[])
+            {
+                dataOut.Write((byte[])value);
+            }
+            else if (objType.IsPrimitive)
+            {
+                if(value is Byte)
+                {
+                    dataOut.Write((byte)value);
+                }
+                else if (value is Char)
+                {
+                    dataOut.Write((char)value);
+                }
+                else if (value is Boolean)
+                {
+                    dataOut.Write((bool)value);
+                }
+                else if (value is Int16)
+                {
+                    dataOut.Write((short)value);
+                }
+                else if (value is Int32)
+                {
+                    dataOut.Write((int)value);
+                }
+                else if (value is Int64)
+                {
+                    dataOut.Write((long)value);
+                }
+                else if (value is Single)
+                {
+                    dataOut.Write((float)value);
+                }
+                else if (value is Double)
+                {
+                    dataOut.Write((double)value);
+                }
+                else if (value is String)
+                {
+                    dataOut.Write((string) value);
+                }
+                else
+                {
+                    throw new MessageFormatException("Cannot write primitive type:" + objType);
+                }
+            }
+            else
+            {
+                throw new MessageFormatException("Cannot write non-primitive type:" + objType);
+            }
+            
+        }
+
+        public void WriteSingle(float value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        public void WriteString(string value)
+        {
+            InitializeWritingMode();
+            try
+            {
+                // note if dataOut is an EndianBinaryWriter, strings are written with a 16bit short length.
+                dataOut.Write(value);
+            }
+            catch (Exception e)
+            {
+                throw NMSExceptionSupport.Create(e);
+            }
+        }
+
+        #endregion
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs b/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
new file mode 100644
index 0000000..118b5ab
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IBytesMessageCloak.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    internal interface IBytesMessageCloak : IMessageCloak
+    {
+        BinaryReader getDataReader();
+        BinaryWriter getDataWriter();
+
+        new IBytesMessageCloak Copy();
+
+        int BodyLength { get; }
+        void Reset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IMapMessageCloak.cs b/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
new file mode 100644
index 0000000..acb649a
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IMapMessageCloak.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    interface IMapMessageCloak : IMessageCloak
+    {
+        IPrimitiveMap Map { get; }
+        new IMapMessageCloak Copy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IMessageCloak.cs b/src/main/csharp/Message/Cloak/IMessageCloak.cs
new file mode 100644
index 0000000..623449c
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IMessageCloak.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    /// <summary>
+    /// Provider specific Cloak Interface from provider implementation.
+    /// </summary>
+    interface IMessageCloak : IMessage
+    {
+        byte[] Content
+        {
+            get;
+            set;
+        }
+
+        bool IsBodyReadOnly { get; set; }
+
+        bool IsPropertiesReadOnly { get; set; }
+
+        bool IsReceived { get; }
+
+        IMessageCloak Copy();
+
+        object GetMessageAnnotation(string symbolKey);
+
+        void SetMessageAnnotation(string symbolKey, object value);
+
+        object GetDeliveryAnnotation(string symbolKey);
+
+        void SetDeliveryAnnotation(string symbolKey, object value);
+
+        int DeliveryCount { get; set; }
+
+        int RedeliveryCount { get; set; }
+
+        MessageAcknowledgementHandler AckHandler { get; set; }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs b/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
new file mode 100644
index 0000000..fcf4c97
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IObjectMessageCloak.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Runtime.Serialization;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    internal enum AMQPObjectEncodingType
+    {
+        UNKOWN = -1,
+        AMQP_TYPE = 0,
+        DOTNET_SERIALIZABLE = 1,
+        JAVA_SERIALIZABLE = 2,
+    }
+    interface IObjectMessageCloak : IMessageCloak
+    {
+        new IObjectMessageCloak Copy();
+        object Body { get; set; }
+
+        AMQPObjectEncodingType Type { get; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs b/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
new file mode 100644
index 0000000..8775d1b
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/IStreamMessageCloak.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.IO;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    interface IStreamMessageCloak : IMessageCloak
+    {
+
+
+        new IStreamMessageCloak Copy();
+
+        bool HasNext { get; }
+
+        void Reset();
+
+        void Put(object value);
+
+        object Peek();
+
+        void Pop();
+
+        
+    }
+}