You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:47 UTC

[10/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Cloak/ITextMessageCloak.cs b/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
new file mode 100644
index 0000000..b85f947
--- /dev/null
+++ b/src/main/csharp/Message/Cloak/ITextMessageCloak.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Apache.NMS.AMQP.Message.Cloak
+{
+    interface ITextMessageCloak : IMessageCloak
+    {
+        string Text { get; set; }
+        new ITextMessageCloak Copy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/AMQPMessageFactory.cs b/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
new file mode 100644
index 0000000..8b0268d
--- /dev/null
+++ b/src/main/csharp/Message/Factory/AMQPMessageFactory.cs
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Amqp;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+    using Cloak;
+    using AMQP;
+    class AMQPMessageFactory<T> : MessageFactory<T> where T : ConnectionInfo
+    {
+
+        protected readonly AMQPMessageTransformation<T> transformFactory;
+        protected AMQPObjectEncodingType encodingType = AMQPObjectEncodingType.UNKOWN;
+
+        internal AMQPMessageFactory(NMSResource<T> resource) : base(resource)
+        {
+            transformFactory = new AMQPMessageTransformation<T>(this);
+            InitEncodingType();
+        }
+
+        internal MessageTransformation TransformFactory { get { return transformFactory; } }
+        
+        internal Connection Parent { get { return parent as Connection; } }
+
+        public override MessageTransformation GetTransformFactory()
+        {
+            return transformFactory;
+        }
+
+        public override IBytesMessage CreateBytesMessage()
+        {
+            IBytesMessageCloak cloak = new AMQPBytesMessageCloak(Parent);
+            return new BytesMessage(cloak);
+        }
+
+        public override IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            IBytesMessage msg = CreateBytesMessage();
+            msg.WriteBytes(body);
+            return msg;
+        }
+
+        public override IMapMessage CreateMapMessage()
+        {
+            IMapMessageCloak cloak = new AMQPMapMessageCloak(Parent);
+            return new MapMessage(cloak);
+        }
+
+        public override IMessage CreateMessage()
+        {
+            IMessageCloak cloak = new AMQPMessageCloak(Parent);
+            return new Message(cloak);
+        }
+
+        public override IObjectMessage CreateObjectMessage(object body)
+        {
+            IObjectMessageCloak cloak = new AMQPObjectMessageCloak(Parent, encodingType);
+            return new ObjectMessage(cloak) { Body=body };
+        }
+
+        public override IStreamMessage CreateStreamMessage()
+        {
+            IStreamMessageCloak cloak = new AMQPStreamMessageCloak(Parent);
+            return new StreamMessage(cloak);
+        }
+
+        public override ITextMessage CreateTextMessage()
+        {
+            ITextMessageCloak cloak = new AMQPTextMessageCloak(Parent);
+            return new TextMessage(cloak);
+        }
+
+        public override ITextMessage CreateTextMessage(string text)
+        {
+            ITextMessage msg = CreateTextMessage();
+            msg.Text = text;
+            return msg;
+        }
+
+        private void InitEncodingType()
+        {
+            encodingType = ConnectionEncodingType(Parent);
+            Tracer.InfoFormat("Message Serialization for connection : {0}, is set to: {1}.", Parent.ClientId, encodingType.ToString());
+        }
+
+
+        private const string AMQP_TYPE = "amqp";
+        private const string DOTNET_TYPE = "dotnet";
+        private const string JAVA_TYPE = "java";
+
+        private static AMQPObjectEncodingType ConnectionEncodingType(Connection connection)
+        {
+            string value = connection.Properties[Connection.MESSAGE_OBJECT_SERIALIZATION_PROP];
+            if (value == null) return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
+            if (value.ToLower().StartsWith(AMQP_TYPE))
+            {
+                return AMQPObjectEncodingType.AMQP_TYPE;
+            }
+            else if (value.ToLower().StartsWith(DOTNET_TYPE))
+            {
+                return AMQPObjectEncodingType.DOTNET_SERIALIZABLE;
+            }
+            else if (value.ToLower().StartsWith(JAVA_TYPE))
+            {
+                return AMQPObjectEncodingType.JAVA_SERIALIZABLE;
+            }
+            else
+            {
+                return AMQPObjectMessageCloak.DEFAULT_ENCODING_TYPE;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/IMessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/IMessageFactory.cs b/src/main/csharp/Message/Factory/IMessageFactory.cs
new file mode 100644
index 0000000..ed9b2d7
--- /dev/null
+++ b/src/main/csharp/Message/Factory/IMessageFactory.cs
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+    interface IMessageFactory 
+    {
+        MessageTransformation GetTransformFactory();
+
+        // Factory methods to create messages
+
+        /// <summary>
+        /// Creates a new message with an empty body
+        /// </summary>
+        IMessage CreateMessage();
+
+        /// <summary>
+        /// Creates a new text message with an empty body
+        /// </summary>
+        ITextMessage CreateTextMessage();
+
+        /// <summary>
+        /// Creates a new text message with the given body
+        /// </summary>
+        ITextMessage CreateTextMessage(string text);
+
+        /// <summary>
+        /// Creates a new Map message which contains primitive key and value pairs
+        /// </summary>
+        IMapMessage CreateMapMessage();
+
+        /// <summary>
+        /// Creates a new Object message containing the given .NET object as the body
+        /// </summary>
+        IObjectMessage CreateObjectMessage(object body);
+
+        /// <summary>
+        /// Creates a new binary message
+        /// </summary>
+        IBytesMessage CreateBytesMessage();
+
+        /// <summary>
+        /// Creates a new binary message with the given body
+        /// </summary>
+        IBytesMessage CreateBytesMessage(byte[] body);
+
+        /// <summary>
+        /// Creates a new stream message
+        /// </summary>
+        IStreamMessage CreateStreamMessage();
+
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Factory/MessageFactory.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Factory/MessageFactory.cs b/src/main/csharp/Message/Factory/MessageFactory.cs
new file mode 100644
index 0000000..49e9b43
--- /dev/null
+++ b/src/main/csharp/Message/Factory/MessageFactory.cs
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message.Factory
+{
+    internal abstract class MessageFactory<T> : IMessageFactory where T : ResourceInfo
+    {
+        private static readonly IDictionary<Id, IMessageFactory> resgistry;
+
+        static MessageFactory()
+        {
+            resgistry = new ConcurrentDictionary<Id, IMessageFactory>();
+        }
+
+        public static void Register(NMSResource<T> resource)
+        {
+            if (resource is Connection)
+            {
+                resgistry.Add(resource.Id, (new AMQPMessageFactory<ConnectionInfo>(resource as Connection)) as IMessageFactory);
+            }
+            else
+            {
+                throw new NMSException("Invalid Message Factory Type " + resource.GetType().FullName);
+            }
+        }
+
+        public static void Unregister(NMSResource<T> resource)
+        {
+            if(resource != null && resource.Id != null)
+            {
+                if(!resgistry.Remove(resource.Id))
+                {
+                    if(resgistry.ContainsKey(resource.Id))
+                        Tracer.WarnFormat("MessageFactory was not able to unregister resource {0}.", resource.Id);
+                }
+            }
+        }
+
+        public static IMessageFactory Instance(Connection resource)
+        {
+            IMessageFactory factory = null;
+            resgistry.TryGetValue(resource.Id, out factory);
+            if(factory == null)
+            {
+                throw new NMSException("Resource "+resource+" is not registered as message factory.");
+            }
+            return factory;
+        }
+        
+        protected readonly NMSResource<T> parent;
+
+        protected  MessageFactory(NMSResource<T> resource)
+        {
+            parent = resource;
+        }
+
+        public abstract MessageTransformation GetTransformFactory();
+        public abstract IMessage CreateMessage();
+        public abstract ITextMessage CreateTextMessage();
+        public abstract ITextMessage CreateTextMessage(string text);
+        public abstract IMapMessage CreateMapMessage();
+        public abstract IObjectMessage CreateObjectMessage(object body);
+        public abstract IBytesMessage CreateBytesMessage();
+        public abstract IBytesMessage CreateBytesMessage(byte[] body);
+        public abstract IStreamMessage CreateStreamMessage();
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/MapMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/MapMessage.cs b/src/main/csharp/Message/MapMessage.cs
new file mode 100644
index 0000000..8679e3a
--- /dev/null
+++ b/src/main/csharp/Message/MapMessage.cs
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Message.Cloak;
+
+namespace Apache.NMS.AMQP.Message
+{
+    /// <summary>
+    /// Apache.NMS.AMQP.Message.MapMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.IMapMessage interface.
+    /// Apache.NMS.AMQP.Message.MapMessage uses the Apache.NMS.AMQP.Message.Cloak.IMapMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+    /// </summary>
+    class MapMessage : Message, IMapMessage
+    {
+        new private readonly IMapMessageCloak cloak;
+        private PrimitiveMapInterceptor map;
+
+        internal MapMessage(IMapMessageCloak message) : base(message)
+        {
+            cloak = message;
+        }
+
+        public override bool IsReadOnly
+        {
+            get { return base.IsReadOnly; }
+            internal set
+            {
+                if (map != null)
+                {
+                    map.ReadOnly = value;
+                }
+                base.IsReadOnly = value;
+            }
+        }
+
+        public IPrimitiveMap Body
+        {
+            get
+            {
+                if(map == null)
+                {
+                    map = new PrimitiveMapInterceptor(this, cloak.Map, IsReadOnly, true);
+                }
+                return map;
+            }
+        }
+
+        internal override Message Copy()
+        {
+            return new MapMessage(cloak.Copy());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/Message.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/Message.cs b/src/main/csharp/Message/Message.cs
new file mode 100644
index 0000000..b2ba218
--- /dev/null
+++ b/src/main/csharp/Message/Message.cs
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Message
+{
+
+    using Cloak;
+    
+    internal enum AckType
+    {
+        ACCEPTED = 0,
+        REJECTED = 1,
+        RELEASED = 2,
+        MODIFIED_FAILED = 3,
+        MODIFIED_FAILED_UNDELIVERABLE = 4,
+    }
+
+    /// <summary>
+    /// Apache.NMS.AMQP.Message.Message is the root message class that implements the Apache.NMS.IMessage interface.
+    /// Apache.NMS.AMQP.Message.Message uses the Apache.NMS.AMQP.Message.Cloak.IMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+    /// </summary>
+    class Message : IMessage
+    {
+
+        public static readonly string MESSAGE_VENDOR_ACK_PROP = PropertyUtil.CreateProperty("ACK.TYPE", "AMQP");
+
+        protected readonly IMessageCloak cloak;
+        private IPrimitiveMap propertyHelper = null;
+
+        #region Constructors
+
+        internal Message(IMessageCloak message)
+        {
+            this.cloak = message;
+        }
+        
+        #endregion
+        
+        #region Protected Methods
+
+        protected void FailIfReadOnlyMsgBody()
+        {
+            if(IsReadOnly == true)
+            {
+                throw new MessageNotWriteableException("Message is in Read-Only mode.");
+            }
+        }
+
+        protected void FailIfWriteOnlyMsgBody()
+        {
+            if (IsReadOnly == false)
+            {
+                throw new MessageNotReadableException("Message is in Write-Only mode.");
+            }
+        }
+
+        #endregion
+
+        #region Public Properties
+
+        public virtual byte[] Content
+        {
+            get
+            {
+                return cloak.Content;
+            }
+
+            set
+            {
+                cloak.Content = value;
+            }
+        }
+
+        public virtual bool IsReadOnly
+        {
+            get { return cloak.IsBodyReadOnly; }
+            internal set { cloak.IsBodyReadOnly = value; }
+        }
+
+        public virtual bool IsReadOnlyProperties
+        {
+            get { return cloak.IsPropertiesReadOnly; }
+            internal set { cloak.IsPropertiesReadOnly = value; }
+        }
+
+        #endregion
+
+        #region IMessage Properties
+
+
+        public string NMSCorrelationID
+        {
+            get { return cloak.NMSCorrelationID; }
+            set { cloak.NMSCorrelationID = value; }
+        }
+
+        public MsgDeliveryMode NMSDeliveryMode
+        {
+            get { return cloak.NMSDeliveryMode; }
+            set { cloak.NMSDeliveryMode = value; }
+        }
+
+        public IDestination NMSDestination
+        {
+            get { return cloak.NMSDestination; }
+            set { cloak.NMSDestination = value; }
+        }
+
+        public string NMSMessageId
+        {
+            get { return cloak.NMSMessageId; }
+            set { cloak.NMSMessageId = value; }
+        }
+
+        public MsgPriority NMSPriority
+        {
+            get { return cloak.NMSPriority; }
+            set { cloak.NMSPriority = value; }
+        }
+
+        public bool NMSRedelivered
+        {
+            get { return cloak.NMSRedelivered; }
+            set { cloak.NMSRedelivered = value; }
+        }
+
+        public IDestination NMSReplyTo
+        {
+            get { return cloak.NMSReplyTo; }
+            set { cloak.NMSReplyTo = value; }
+        }
+
+        public DateTime NMSTimestamp
+        {
+            get { return cloak.NMSTimestamp; }
+            set { cloak.NMSTimestamp = value; }
+        }
+
+        public TimeSpan NMSTimeToLive
+        {
+            get { return cloak.NMSTimeToLive; }
+            set { cloak.NMSTimeToLive = value; }
+        }
+
+        public string NMSType
+        {
+            get { return cloak.NMSType; }
+            set { cloak.NMSType = value; }
+        }
+
+        public IPrimitiveMap Properties
+        {
+            get
+            {
+                if(propertyHelper == null)
+                {
+                    propertyHelper = new NMSMessagePropertyInterceptor(this, cloak.Properties);
+                }
+                return propertyHelper;
+            }
+        }
+
+        #endregion
+
+        #region IMessage Methods
+
+        public virtual void Acknowledge()
+        {
+            cloak.Acknowledge();
+        }
+
+        public virtual void ClearBody()
+        {
+            cloak.ClearBody();
+        }
+
+        public void ClearProperties()
+        {
+            propertyHelper.Clear();
+        }
+
+        #endregion
+
+        #region Internal Methods
+
+        internal IMessageCloak GetMessageCloak()
+        {
+            return cloak;
+        }
+
+        internal virtual Message Copy()
+        {
+            return new Message(this.cloak.Copy());
+        }
+
+        protected virtual void CopyInto(Message other)
+        {
+
+        }
+
+        #endregion
+
+        public override string ToString()
+        {
+            return base.ToString() + ":\n Impl Type: " + cloak.ToString();
+        }
+
+    }
+
+    internal class MessageAcknowledgementHandler
+    {
+        
+        private MessageConsumer consumer;
+        private Session session;
+        private Message message;
+        
+        private AckType? atype = null;
+
+        public MessageAcknowledgementHandler(MessageConsumer mc, Message msg)
+        {
+            consumer = mc;
+            session = consumer.Session;
+            message = msg;
+            
+        }
+
+        public AckType AcknowledgementType
+        {
+            get
+            {
+                return atype ?? MessageSupport.DEFAULT_ACK_TYPE;
+            }
+            set
+            {
+                atype = value;
+            }
+        }
+
+        public void ClearAckType()
+        {
+            atype = null;
+        }
+
+        public bool IsAckTypeSet
+        {
+            get => atype != null;
+        }
+
+        public void Acknowledge()
+        {
+            
+            if (session.AcknowledgementMode.Equals(AcknowledgementMode.IndividualAcknowledge))
+            {
+                consumer.AcknowledgeMessage(message, AcknowledgementType);
+            }
+            else // Session Ackmode  AcknowledgementMode.ClientAcknowledge
+            {
+                session.Acknowledge(AcknowledgementType);
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/ObjectMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/ObjectMessage.cs b/src/main/csharp/Message/ObjectMessage.cs
new file mode 100644
index 0000000..d2a7054
--- /dev/null
+++ b/src/main/csharp/Message/ObjectMessage.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Runtime.Serialization;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+
+namespace Apache.NMS.AMQP.Message
+{
+    using Cloak;
+    class ObjectMessage : Message, IObjectMessage
+    {
+        protected new readonly IObjectMessageCloak cloak;
+        internal ObjectMessage(IObjectMessageCloak message) : base(message)
+        {
+            this.cloak = message;
+        }
+
+        public new byte[] Content
+        {
+            get
+            {
+                return cloak.Content;
+            }
+
+            set
+            {
+                
+            }
+        }
+
+        public object Body
+        {
+            get
+            {
+                return cloak.Body;
+            }
+            set
+            {
+                
+                try
+                {
+                    cloak.Body = value;
+                }
+                catch (SerializationException se)
+                {
+                    throw NMSExceptionSupport.CreateMessageFormatException(se);
+                }
+                
+            }
+        }
+
+        internal override Message Copy()
+        {
+            return new ObjectMessage(this.cloak.Copy());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/StreamMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/StreamMessage.cs b/src/main/csharp/Message/StreamMessage.cs
new file mode 100644
index 0000000..3891df6
--- /dev/null
+++ b/src/main/csharp/Message/StreamMessage.cs
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.IO;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.AMQP.Util.Types;
+
+namespace Apache.NMS.AMQP.Message
+{
+    using Cloak;
+    class StreamMessage : Message, IStreamMessage
+    {
+        
+        private const int NO_BYTES_IN_BUFFER = -1;
+
+        private readonly new IStreamMessageCloak cloak;
+
+        private int RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+
+        private byte[] Buffer = null;
+
+        internal StreamMessage(IStreamMessageCloak message) : base(message)
+        {
+            cloak = message;
+        }
+        
+        #region IStreamMessage Methods
+
+        public bool ReadBoolean()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            bool result;
+            object value = cloak.Peek();
+            if(value == null)
+            {
+                result = Convert.ToBoolean(value);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<bool>(value);
+            }
+            cloak.Pop();
+            return result;
+        }
+
+        public byte ReadByte()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            byte result;
+            object value = cloak.Peek();
+            if(value == null)
+            {
+                result = Convert.ToByte(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<byte>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public int ReadBytes(byte[] value)
+        {
+            FailIfWriteOnlyMsgBody();
+            if (value == null)
+            {
+                throw new NullReferenceException("Target byte array is null.");
+            }
+            if (RemainingBytesInBuffer == NO_BYTES_IN_BUFFER)
+            {
+                object data = cloak.Peek();
+                if (data == null)
+                {
+                    return -1;
+                }
+                else if (!(data is byte[]))
+                {
+                    throw new MessageFormatException("Next stream value is not a byte array.");
+                }
+                Buffer = data as byte[];
+                RemainingBytesInBuffer = Buffer.Length;
+            } 
+            int bufferOffset = Buffer.Length - RemainingBytesInBuffer;
+            int copyLength = Math.Min(value.Length, RemainingBytesInBuffer);
+            if(copyLength > 0)
+                Array.Copy(Buffer, bufferOffset, value, 0, copyLength);
+            RemainingBytesInBuffer -= copyLength;
+            if(RemainingBytesInBuffer == 0)
+            {
+                RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+                Buffer = null;
+                cloak.Pop();
+            }
+            return copyLength;
+        }
+
+        public char ReadChar()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            char result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                throw new NullReferenceException("Cannot convert NULL value to char.");
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<char>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public double ReadDouble()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            double result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToDouble(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<double>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public short ReadInt16()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            short result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToInt16(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<short>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public int ReadInt32()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            int result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToInt32(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<int>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public long ReadInt64()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            long result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToInt64(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<long>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public object ReadObject()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            object result = null;
+            object value = null;
+            try
+            {
+                value = cloak.Peek();
+                if (value == null)
+                {
+                    result = null;
+                }
+                else if (value is byte[])
+                {
+                    byte[] buffer = value as byte[];
+                    result = new byte[buffer.Length];
+                    Array.Copy(buffer, 0, result as byte[], 0, buffer.Length);
+                }
+                else if (ConversionSupport.IsNMSType(value))
+                {
+                    result = value;
+                }
+            }
+            catch (EndOfStreamException eos)
+            {
+                throw NMSExceptionSupport.CreateMessageEOFException(eos);
+            }
+            catch (IOException ioe)
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(ioe);
+            }
+            catch (Exception e)
+            {
+                Tracer.InfoFormat("Unexpected exception caught reading Object stream. Exception = {0}", e);
+                throw NMSExceptionSupport.Create("Unexpected exception caught reading Object stream.", e);
+            }
+            cloak.Pop();
+            return result;
+        }
+
+        public float ReadSingle()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            float result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToSingle(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<float>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public string ReadString()
+        {
+            FailIfWriteOnlyMsgBody();
+            FailIfBytesInBuffer();
+            string result;
+            object value = cloak.Peek();
+            if (value == null)
+            {
+                result = Convert.ToString(null);
+            }
+            else
+            {
+                result = ConversionSupport.ConvertNMSType<string>(value);
+            }
+
+            cloak.Pop();
+            return result;
+        }
+
+        public void Reset()
+        {
+            RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+            Buffer = null;
+            IsReadOnly = true;
+            cloak.Reset();
+        }
+
+        public override void ClearBody()
+        {
+            RemainingBytesInBuffer = NO_BYTES_IN_BUFFER;
+            Buffer = null;
+            IsReadOnly = false;
+
+            base.ClearBody();
+        }
+
+        public void WriteBoolean(bool value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteByte(byte value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteBytes(byte[] value)
+        {
+            WriteBytes(value, 0, value.Length);
+        }
+
+        public void WriteBytes(byte[] value, int offset, int length)
+        {
+            FailIfReadOnlyMsgBody();
+            byte[] entry = new byte[length];
+            Array.Copy(value, offset, entry, 0, length);
+            cloak.Put(entry);
+        }
+
+        public void WriteChar(char value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteDouble(double value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteInt16(short value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteInt32(int value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteInt64(long value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteObject(object value)
+        {
+            FailIfReadOnlyMsgBody();
+            if(value == null)
+            {
+                cloak.Put(value);
+            }
+            else if(value is byte[])
+            {
+                WriteBytes(value as byte[]);
+            }
+            else if (ConversionSupport.IsNMSType(value))
+            {
+                cloak.Put(value);
+            }
+            else
+            {
+                throw NMSExceptionSupport.CreateMessageFormatException(new Exception("Unsupported Object type: " + value.GetType().Name));
+            }
+        }
+
+        public void WriteSingle(float value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        public void WriteString(string value)
+        {
+            FailIfReadOnlyMsgBody();
+            cloak.Put(value);
+        }
+
+        #endregion
+
+        #region Validation Methods
+        
+        protected void FailIfBytesInBuffer()
+        {
+            if(RemainingBytesInBuffer != NO_BYTES_IN_BUFFER)
+            {
+                throw new MessageFormatException("Unfinished Buffered read for ReadBytes(byte[] value)");
+            }
+        }
+
+        #endregion
+
+        internal override Message Copy()
+        {
+            return new StreamMessage(this.cloak.Copy());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Message/TextMessage.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Message/TextMessage.cs b/src/main/csharp/Message/TextMessage.cs
new file mode 100644
index 0000000..e7a6169
--- /dev/null
+++ b/src/main/csharp/Message/TextMessage.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP.Message
+{
+    using Cloak;
+    /// <summary>
+    /// Apache.NMS.AMQP.Message.TextMessage inherits from Apache.NMS.AMQP.Message.Message that implements the Apache.NMS.ITextMessage interface.
+    /// Apache.NMS.AMQP.Message.TextMessage uses the Apache.NMS.AMQP.Message.Cloak.ITextMessageCloak interface to detach from the underlying AMQP 1.0 engine.
+    /// </summary>
+    class TextMessage : Message, ITextMessage
+    {
+        protected readonly new ITextMessageCloak cloak;
+
+        #region Constructor
+
+        internal TextMessage(ITextMessageCloak cloak) : base(cloak)
+        {
+            this.cloak = cloak;
+        }
+
+        #endregion
+
+        #region ITextMessage Properties
+
+        public string Text
+        {
+            get
+            {
+                return cloak.Text;
+            }
+
+            set
+            {
+                FailIfReadOnlyMsgBody();
+                cloak.Text = value;
+            }
+        }
+
+        #endregion
+
+        internal override Message Copy()
+        {
+            return new TextMessage(this.cloak.Copy());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
new file mode 100644
index 0000000..2ee9312
--- /dev/null
+++ b/src/main/csharp/MessageConsumer.cs
@@ -0,0 +1,977 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Amqp;
+using Amqp.Framing;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Util.Types.Queue;
+using Apache.NMS.AMQP.Message.Cloak;
+
+namespace Apache.NMS.AMQP
+{
+    /// <summary>
+    /// MessageConsumer Implement the <see cref="Apache.NMS.IMessageConsumer"/> interface. 
+    /// This class configures and manages an amqp receiver link.
+    /// The Message consumer can be configured to receive message asynchronously or synchronously.
+    /// </summary>
+    class MessageConsumer : MessageLink, IMessageConsumer
+    {
+        // The Executor threshold limits the number of message listener dispatch events that can be on the session's executor at given time.
+        private const int ExecutorThreshold = 2;
+        private ConsumerInfo consumerInfo;
+        private readonly string selector;
+        private Apache.NMS.Util.Atomic<bool> hasStarted = new Apache.NMS.Util.Atomic<bool>(false);
+        private MessageCallback OnInboundAMQPMessage;
+        private IMessageQueue messageQueue;
+        private LinkedList<IMessageDelivery> delivered;
+        private System.Threading.ManualResetEvent MessageListenerInUseEvent = new System.Threading.ManualResetEvent(true);
+        // pending Message delivery tasks counts the number of pending tasks on the Session's executor.
+        // this should optimize the number of delivery task on the executor thread.
+        private volatile int pendingMessageDeliveryTasks = 0;
+        private volatile int MaxPendingTasks = 0;
+
+        // stat counters useful for debuging
+        // TODO create statistic container for counters maybe use ConsumerInfo?
+        private int transportMsgCount = 0;
+        private int messageDispatchCount = 0;
+
+        #region Constructor
+
+        internal MessageConsumer(Session ses, Destination dest) : this(ses, dest, null, null)
+        {
+        }
+
+        internal MessageConsumer(Session ses, IDestination dest) : this(ses, dest, null, null)
+        {
+        }
+
+        internal MessageConsumer(Session ses, IDestination dest, string name, string selector, bool noLocal = false) : base(ses, dest)
+        {
+            this.selector = selector;
+            consumerInfo = new ConsumerInfo(ses.ConsumerIdGenerator.GenerateId());
+            consumerInfo.SubscriptionName = name;
+            consumerInfo.Selector = this.selector;
+            consumerInfo.NoLocal = noLocal;
+            Info = consumerInfo;
+            messageQueue =  new PriorityMessageQueue();
+            delivered = new LinkedList<IMessageDelivery>();
+            Configure();
+        }
+        
+        #endregion
+
+        #region Private Properties
+
+        protected new IReceiverLink Link
+        {
+            get { return base.Link as IReceiverLink; }
+            
+        }
+        
+        // IsBrowser is a stub for an inherited Brower subclass
+        protected virtual bool IsBrowser { get { return false; } }
+
+        #endregion
+
+        #region Internal Properties
+
+        internal Id ConsumerId { get { return this.Info.Id; } }
+
+        internal virtual bool IsDurable {  get { return this.consumerInfo.SubscriptionName != null && this.consumerInfo.SubscriptionName.Length > 0; } }
+
+        internal virtual bool HasSelector { get { return this.consumerInfo.Selector != null && this.consumerInfo.Selector.Length > 0; } }
+        
+        #endregion
+
+        #region Private Methods
+
+        private void AckReceived(IMessageDelivery delivery)
+        {
+            IMessageCloak cloak = delivery.Message.GetMessageCloak();
+            if (cloak.AckHandler != null)
+            {
+                delivered.AddLast(delivery);
+            }
+            else
+            {
+                AckConsumed(delivery);
+            }
+        }
+        
+        private void AckConsumed(IMessageDelivery delivery)
+        {
+            Message.Message nmsMessage = delivery.Message;
+            Tracer.DebugFormat("Consumer {0} Acking Accepted for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
+            delivered.Remove(delivery);
+            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+            this.Link.Accept(amqpMessage);
+        }
+
+        private void AckReleased(IMessageDelivery delivery)
+        {
+            Message.Message nmsMessage = delivery.Message;
+            Tracer.DebugFormat("Consumer {0} Acking Released for Message {1} ", ConsumerId, nmsMessage.NMSMessageId);
+            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+            this.Link.Release(amqpMessage);
+        }
+
+        private void AckRejected(IMessageDelivery delivery, NMSException ex)
+        {
+            Error err = new Error(NMSErrorCode.INTERNAL_ERROR);
+            err.Description = ex.Message;
+            AckRejected(delivery, err);
+        }
+        
+        private void AckRejected(IMessageDelivery delivery, Error err = null)
+        {
+            Tracer.DebugFormat("Consumer {0} Acking Rejected for Message {1} with error {2} ", ConsumerId, delivery.Message.NMSMessageId, err?.ToString());
+            Amqp.Message amqpMessage = (delivery.Message.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+            this.Link.Reject(amqpMessage, err);
+        }
+
+        private void AckModified(IMessageDelivery delivery, bool deliveryFailed, bool undeliverableHere = false)
+        {
+            Message.Message nmsMessage = delivery.Message;
+            Tracer.DebugFormat("Consumer {0} Acking Modified for Message {1}{2}{3}.", ConsumerId, nmsMessage.NMSMessageId, 
+                deliveryFailed ? " Delivery Failed" : "",
+                undeliverableHere ? " Undeliveryable Here" : "");
+            Amqp.Message amqpMessage = (nmsMessage.GetMessageCloak() as Message.AMQP.AMQPMessageCloak).AMQPMessage;
+            //TODO use Link.Modified from amqpNetLite 2.0.0
+            this.Link.Modify(amqpMessage, deliveryFailed, undeliverableHere, null);
+        }
+
+        private bool IsMessageRedeliveryExceeded(IMessageDelivery delivery)
+        {
+            Message.Message message = delivery.Message;
+            IRedeliveryPolicy policy = this.Session.Connection.RedeliveryPolicy;
+            if (policy != null && policy.MaximumRedeliveries >= 0)
+            {
+                IMessageCloak msgCloak = message.GetMessageCloak();
+                return msgCloak.RedeliveryCount > policy.MaximumRedeliveries;
+            }
+            return false;
+        }
+
+        private bool IsMessageExpired(IMessageDelivery delivery)
+        {
+            Message.Message message = delivery.Message;
+            if (message.NMSTimeToLive != TimeSpan.Zero)
+            {
+                DateTime now = DateTime.UtcNow;
+                if (!IsBrowser && (message.NMSTimestamp + message.NMSTimeToLive) < now)
+                {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        #endregion
+
+        #region Protected Methods
+
+        protected void EnterMessageListenerEvent()
+        {
+            try
+            {
+                if(!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+                    MessageListenerInUseEvent.Reset();
+            }
+            catch (Exception e)
+            {
+                Tracer.ErrorFormat("Failed to Reset MessageListener Event signal. Error : {0}", e);
+            }
+            
+        }
+
+        protected void LeaveMessageListenerEvent()
+        {
+            RemoveTaskRef();
+            try
+            {
+                if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+                    MessageListenerInUseEvent.Set();
+            }
+            catch (Exception e)
+            {
+                Tracer.ErrorFormat("Failed to Send MessageListener Event signal. Error : {0}", e);
+            }
+        }
+
+        protected bool WaitOnMessageListenerEvent(int timeout = -1)
+        {
+            bool signaled = false;
+            if (OnMessage != null)
+            {
+                if (!MessageListenerInUseEvent.SafeWaitHandle.IsClosed)
+                {
+                    signaled = (timeout > -1) ? MessageListenerInUseEvent.WaitOne(timeout) : MessageListenerInUseEvent.WaitOne();
+                }
+                else if (!this.IsClosed)
+                {
+                    Tracer.WarnFormat("Failed to wait for Message Listener Event on consumer {0}", Id);
+                }
+            }
+            
+            return signaled;
+        }
+
+        protected void AddTaskRef()
+        {
+            System.Threading.Interlocked.Increment(ref pendingMessageDeliveryTasks);
+            int lastPending = MaxPendingTasks;
+            MaxPendingTasks = Math.Max(pendingMessageDeliveryTasks, MaxPendingTasks);
+            if (lastPending < MaxPendingTasks)
+            {
+                //Tracer.WarnFormat("Consumer {0} Distpatch event highwatermark increased to {1}.", Id, MaxPendingTasks);
+            }
+        }
+
+        protected void RemoveTaskRef()
+        {
+            System.Threading.Interlocked.Decrement(ref pendingMessageDeliveryTasks);
+        }
+        
+        protected void OnInboundMessage(IReceiverLink link, Amqp.Message message)
+        {
+            Message.Message msg = null;
+            try
+            {
+                IMessage nmsMessage = Message.AMQP.AMQPMessageBuilder.CreateProviderMessage(this, message);
+                msg = nmsMessage as Message.Message;
+                if(
+                    Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
+                    Session.AcknowledgementMode.Equals(AcknowledgementMode.ClientAcknowledge)
+                    )
+                {
+                    msg.GetMessageCloak().AckHandler = new Message.MessageAcknowledgementHandler(this, msg);
+                }
+            }
+            catch (Exception e)
+            {
+                this.Session.OnException(e);
+            }
+
+            if(msg != null)
+            {
+                transportMsgCount++;
+                
+                SendForDelivery(new MessageDelivery(msg));
+            }
+        }
+
+        protected void SendForDelivery(IMessageDelivery nmsDelivery)
+        {
+            this.messageQueue.Enqueue(nmsDelivery);
+
+            if (this.OnMessage != null && pendingMessageDeliveryTasks < ExecutorThreshold)
+            {
+                AddTaskRef();
+                DispatchEvent dispatchEvent = new MessageListenerDispatchEvent(this);
+                Session.Dispatcher.Enqueue(dispatchEvent);
+            }
+            else if (pendingMessageDeliveryTasks < 0)
+            {
+                Tracer.ErrorFormat("Consumer {0} has invalid pending tasks count on executor {1}.", Id, pendingMessageDeliveryTasks);
+            }
+        }
+
+        protected virtual void OnAttachResponse(ILink link, Attach attachResponse)
+        {
+            Tracer.InfoFormat("Received Performative Attach response on Link: {0}, Response: {1}", ConsumerId, attachResponse.ToString());
+            OnResponse();
+            if (link.Error != null)
+            {
+                this.Session.OnException(ExceptionSupport.GetException(link, "Consumer {0} Attach Failure.", this.ConsumerId));
+            }
+        }
+
+        protected void SendFlow(int credit)
+        {
+            if (!mode.Value.Equals(Resource.Mode.Stopped))
+            {
+                this.Link.SetCredit(credit, false);
+            }
+        }
+
+        protected virtual bool TryDequeue(out IMessageDelivery delivery, int timeout)
+        {
+            delivery = null;
+            DateTime deadline = DateTime.UtcNow + TimeSpan.FromMilliseconds(timeout);
+            Tracer.DebugFormat("Waiting for msg availability Deadline {0}", deadline);
+            try
+            {
+                while (true)
+                {
+                    if(timeout == 0)
+                    {
+                        delivery = this.messageQueue.DequeueNoWait();
+                    }
+                    else
+                    {
+                        delivery = this.messageQueue.Dequeue(timeout);
+                    }
+                    
+                    if (delivery == null)
+                    {
+                        if (timeout == 0 || this.Link.IsClosed || this.messageQueue.IsClosed)
+                        {
+                            return false;
+                        }
+                        else if (timeout > 0)
+                        {
+                            timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
+                        }
+                    }
+                    else if (IsMessageExpired(delivery))
+                    {
+                        DateTime now = DateTime.UtcNow;
+                        Error err = new Error(NMSErrorCode.PROPERTY_ERROR);
+                        err.Description = "Message Expired";
+                        AckRejected(delivery,  err);
+                        if (timeout > 0)
+                        {
+                            timeout = Math.Max((deadline - now).Milliseconds, 0);
+                        }
+                        if(Tracer.IsDebugEnabled)
+                            Tracer.DebugFormat("{0} Filtered expired (deadline {1} Now {2}) message: {3}", ConsumerId, deadline, now, delivery);
+                    }
+                    else if (IsMessageRedeliveryExceeded(delivery))
+                    {
+                        if (Tracer.IsDebugEnabled)
+                            Tracer.DebugFormat("{0} Filtered Message with excessive Redelivery Count: {1}", ConsumerId, delivery);
+                        AckModified(delivery, true, true);
+                        if (timeout > 0)
+                        {
+                            timeout = Math.Max((deadline - DateTime.UtcNow).Milliseconds, 0);
+                        }
+                    }
+                    else
+                    {
+                        break;
+                    }
+
+                }
+            }
+            catch(Exception e)
+            {
+                throw ExceptionSupport.Wrap(e, "Failed to Received message on consumer {0}.", ConsumerId);
+            }
+            
+            return true;
+        }
+
+        protected void ThrowIfAsync()
+        {
+            if (this.OnMessage != null)
+            {
+                throw new IllegalStateException("Cannot synchronously receive message on a synchronous consumer " + consumerInfo);
+            }
+        }
+
+        protected void DrainMessageQueueIfAny()
+        {
+            if (OnMessage != null && messageQueue.Count > 0)
+            {
+                DispatchEvent deliveryTask = new MessageListenerDispatchEvent(this);
+                Session.Dispatcher.Enqueue(deliveryTask);
+            }
+        }
+
+        protected void PrepareMessageForDelivery(Message.Message message)
+        {
+            if (message == null) return;
+            if(message is Message.BytesMessage)
+            {
+                (message as Message.BytesMessage).Reset();
+            }
+            else if(message is Message.StreamMessage)
+            {
+                (message as Message.StreamMessage).Reset();
+            }
+            else
+            {
+                message.IsReadOnly = true;
+            }
+            message.IsReadOnlyProperties = true;
+        }
+
+        #endregion
+
+        #region Internal Methods
+
+        internal bool HasSubscription(string name)
+        {
+            return !IsClosed && IsDurable && String.Compare(name, this.consumerInfo.SubscriptionName, false) == 0;
+        }
+
+        internal bool IsUsingDestination(IDestination destination)
+        {
+            return this.Destination.Equals(destination);
+        }
+
+        internal void Recover()
+        {
+            Tracer.DebugFormat("Session recover for consumer: {0}", Id);
+            IMessageCloak cloak = null;
+            IMessageDelivery delivery = null;
+            lock (messageQueue.SyncRoot)
+            {
+                while ((delivery = delivered.Last?.Value) != null)
+                {
+                    cloak = delivery.Message.GetMessageCloak();
+                    cloak.DeliveryCount = cloak.DeliveryCount + 1;
+                    (delivery as MessageDelivery).EnqueueFirst = true;
+                    delivered.RemoveLast();
+                    SendForDelivery(delivery);
+                }
+                delivered.Clear();
+            }
+        }
+
+        internal void AcknowledgeMessage(Message.Message message, Message.AckType ackType)
+        {
+            
+            ThrowIfClosed();
+            IMessageDelivery nmsDelivery = null;
+            foreach (IMessageDelivery delivery in delivered)
+            {
+                if (delivery.Message.Equals(message))
+                {
+                    nmsDelivery = delivery;
+                }
+            }
+            if(nmsDelivery == null)
+            {
+                nmsDelivery = new MessageDelivery(message);
+            }
+            switch (ackType)
+            {
+                case Message.AckType.ACCEPTED:
+                    AckConsumed(nmsDelivery);
+                    break;
+                case Message.AckType.MODIFIED_FAILED:
+                    AckModified(nmsDelivery, true);
+                    break;
+                case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
+                    AckModified(nmsDelivery, true, true);
+                    break;
+                case Message.AckType.REJECTED:
+                    AckRejected(nmsDelivery);
+                    break;
+                case Message.AckType.RELEASED:
+                    AckReleased(nmsDelivery);
+                    break;
+                default:
+                    throw new NMSException("Unkown message acknowledgement type " + ackType);
+            }
+        }
+
+        internal void Acknowledge(Message.AckType ackType)
+        {
+            
+            foreach(IMessageDelivery delivery in delivered.ToArray())
+            {
+                switch (ackType)
+                {
+                    case Message.AckType.ACCEPTED:
+                        AckConsumed(delivery);
+                        break;
+                    case Message.AckType.MODIFIED_FAILED:
+                        AckModified(delivery, true);
+                        break;
+                    case Message.AckType.MODIFIED_FAILED_UNDELIVERABLE:
+                        AckModified(delivery, true, true);
+                        break;
+                    case Message.AckType.REJECTED:
+                        AckRejected(delivery);
+                        break;
+                    case Message.AckType.RELEASED:
+                        AckReleased(delivery);
+                        break;
+                    default:
+                        Tracer.WarnFormat("Unkown message acknowledgement type {0} for message {}", ackType, delivery.Message.NMSMessageId);
+                        break;
+                }
+            }
+            delivered.Clear();
+        }
+
+        #endregion
+
+        #region IMessageConsumer Properties
+
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get
+            {
+                throw new NotImplementedException();
+            }
+
+            set
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        #endregion
+
+        #region IMessageConsumer Events
+        protected event MessageListener OnMessage;
+
+        public event MessageListener Listener
+        {
+            add
+            {
+
+                if (this.IsStarted)
+                {
+                    throw new IllegalStateException("Cannot add MessageListener to consumer " + Id + " on a started Connection.");
+                }
+                if(value != null)
+                {
+                    OnMessage += value;
+                }
+            }
+            remove
+            {
+                if (this.IsStarted)
+                {
+                    throw new IllegalStateException("Cannot remove MessageListener to consumer " + Id + " on a started Connection.");
+                }
+                if (value != null)
+                {
+                    OnMessage -= value;
+                }
+            }
+        }
+
+        #endregion
+
+        #region IMessageConsumer Methods
+        
+        public IMessage Receive()
+        {
+            ThrowIfClosed();
+            ThrowIfAsync();
+            if (TryDequeue(out IMessageDelivery delivery, -1))
+            {
+                Message.Message copy = delivery.Message.Copy();
+                PrepareMessageForDelivery(copy);
+                AckReceived(delivery);
+                return copy;
+            }
+            return null;
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            ThrowIfClosed();
+            ThrowIfAsync();
+            int timeoutMilis = Convert.ToInt32(timeout.TotalMilliseconds);
+            if(timeoutMilis == 0)
+            {
+                timeoutMilis = -1;
+            }
+            if (TryDequeue(out IMessageDelivery delivery, timeoutMilis))
+            {
+                Message.Message copy = delivery.Message.Copy();
+                PrepareMessageForDelivery(copy);
+                AckReceived(delivery);
+                return copy;
+            }
+            return null;
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            ThrowIfClosed();
+            ThrowIfAsync();
+            if (TryDequeue(out IMessageDelivery delivery, 0))
+            {
+                Message.Message copy = delivery.Message.Copy();
+                PrepareMessageForDelivery(copy);
+                AckReceived(delivery);
+                return copy;
+            }
+            return null;
+        }
+
+        #endregion
+
+        #region MessageLink Methods
+
+        private Target CreateTarget()
+        {
+            Target target = new Target();
+            return target;
+        }
+
+        private Source CreateSource()
+        {
+            Source source = new Source();
+            source.Address = UriUtil.GetAddress(Destination, this.Session.Connection);
+            source.Outcomes = new Amqp.Types.Symbol[] 
+            {
+                SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
+                SymbolUtil.ATTACH_OUTCOME_RELEASED,
+                SymbolUtil.ATTACH_OUTCOME_REJECTED,
+                SymbolUtil.ATTACH_OUTCOME_MODIFIED
+            };
+            source.DefaultOutcome = MessageSupport.MODIFIED_FAILED_INSTANCE;
+
+            if (this.IsDurable)
+            {
+                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_NEVER;
+                source.Durable = (int)TerminusDurability.UNSETTLED_STATE;
+                source.DistributionMode=SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
+            }
+            else
+            {
+                source.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_SESSION_END;
+                source.Durable = (int)TerminusDurability.NONE;
+            }
+            
+            if (this.IsBrowser)
+            {
+                source.DistributionMode = SymbolUtil.ATTACH_DISTRIBUTION_MODE_COPY;
+            }
+
+            source.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
+
+            Amqp.Types.Map filters = new Amqp.Types.Map();
+
+            // TODO add filters for noLocal and Selector using appropriate Amqp Described types
+
+            // No Local
+            // qpid jms defines a no local filter as an amqp described type 
+            //      AmqpJmsNoLocalType where
+            //          Descriptor = 0x0000468C00000003UL
+            //          Described = "NoLocalFilter{}" (type string)
+            if (consumerInfo.NoLocal)
+            {
+                filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, "NoLocalFilter{}");
+            }
+
+            // Selector
+            // qpid jms defines a selector filter as an amqp described type 
+            //      AmqpJmsSelectorType where
+            //          Descriptor = 0x0000468C00000004UL
+            //          Described = "<selector_string>" (type string)
+            if (this.HasSelector)
+            {
+                filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, this.consumerInfo.Selector);
+            }
+
+            // Assign filters
+            if (filters.Count > 0)
+            {
+                source.FilterSet = filters;
+            }
+
+            return source;
+        }
+        
+        protected override ILink CreateLink()
+        {
+            Attach attach = new Amqp.Framing.Attach()
+            {
+                Target = CreateTarget(),
+                Source = CreateSource(),
+                RcvSettleMode = ReceiverSettleMode.First,
+                SndSettleMode = (IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
+            };
+            string name = null;
+            if (IsDurable)
+            {
+                name = consumerInfo.SubscriptionName;
+            }
+            else
+            {
+                string destinationAddress = (attach.Source as Source).Address ?? "";
+                name = "nms:receiver:" + this.ConsumerId.ToString() 
+                    + ((destinationAddress.Length == 0) ? "" : (":" + destinationAddress));
+            }
+            IReceiverLink link = new ReceiverLink(Session.InnerSession as Amqp.Session, name, attach, OnAttachResponse);
+            return link;
+        }
+
+        protected override void OnInternalClosed(IAmqpObject sender, Error error)
+        {
+            if (Tracer.IsDebugEnabled)
+            {
+                Tracer.DebugFormat("Received Close notification for MessageConsumer {0} {1} {2}",
+                    this.Id,
+                    IsDurable ? "with subscription name " + this.consumerInfo.SubscriptionName : "",
+                    error == null ? "" : "with cause " + error);
+            }
+            base.OnInternalClosed(sender, error);
+            this.OnResponse();
+        }
+        
+        protected override void StopResource()
+        {
+            if (Session.Dispatcher.IsOnDispatchThread)
+            {
+                throw new IllegalStateException("Cannot stop Connection {0} in MessageListener.", Session.Connection.ClientId);
+            }
+            // Cut message window
+            // TODO figure out draining message window without raising a closed window exception (link-credit-limit-exceeded Error) from amqpnetlite.
+            //SendFlow(1);
+            // Stop message delivery
+            this.messageQueue.Stop();
+            // Now wait until the MessageListener callback is finish executing.
+            this.WaitOnMessageListenerEvent();
+        }
+
+        protected override void StartResource()
+        {
+            // Do Attach request if not done already
+            base.StartResource();
+            // Start Message Delivery
+            messageQueue.Start();
+            DrainMessageQueueIfAny();
+
+            // Setup AMQP message transport thread callback
+            OnInboundAMQPMessage = OnInboundMessage;
+            // Open Message Window to receive messages.
+            this.Link.Start(consumerInfo.LinkCredit, OnInboundAMQPMessage);
+            
+        }
+
+        
+        
+        /// <summary>
+        /// Executes the AMQP network detach operation.
+        /// </summary>
+        /// <param name="timeout">
+        /// Timeout to wait for for detach response. A timeout of 0 or less will not block to wait for a response.
+        /// </param>
+        /// <param name="cause">Error to detach link. Can be null.</param>
+        /// <exception cref="Amqp.AmqpException">
+        /// Throws when an error occur during amqp detach. Or contains Error response from detach.
+        /// </exception>
+        /// <exception cref="System.TimeoutException">
+        /// Throws when detach response is not received in specified timeout.
+        /// </exception>
+        protected override void DoClose(TimeSpan timeout, Error cause = null)
+        {
+            if(IsDurable)
+            {
+                Task t = this.Link.DetachAsync(cause);
+                if(TimeSpan.Compare(timeout, TimeSpan.Zero) > 0)
+                {
+                    /*
+                     * AmqpNetLite does not allow a timeout to be specific for link detach request even though
+                     * it uses the same close operation which takes a parameter for timeout. AmqpNetLite uses 
+                     * it default timeout of 60000ms, see AmqpObject.DefaultTimeout, for the detach close 
+                     * operation forcing the detach request to be synchronous. To allow for asynchronous detach
+                     * request an NMS MessageConsumer must call the DetachAsync method on a link which will block
+                     * for up to 60000ms asynchronously to set a timeout exception or complete the task. 
+                     */
+                    const int amqpNetLiteDefaultTimeoutMillis = 60000; // taken from AmqpObject.DefaultTimeout
+                    TimeSpan amqpNetLiteDefaultTimeout = TimeSpan.FromMilliseconds(amqpNetLiteDefaultTimeoutMillis);
+                    // Create timeout which allows for the 60000ms block in the DetachAsync task.
+                    TimeSpan actualTimeout = amqpNetLiteDefaultTimeout + timeout;
+                    
+                    TaskUtil.Wait(t, actualTimeout);
+                    if(t.Exception != null)
+                    {
+                        if(t.Exception is AggregateException)
+                        {
+                            throw t.Exception.InnerException;
+                        }
+                        else
+                        {
+                            throw t.Exception;
+                        }
+                    }
+                }
+            }
+            else
+            {
+                base.DoClose(timeout, cause);
+            }
+        }
+        /// <summary>
+        /// Overload for the Template method <see cref="MessageLink.Shutdown"/> specific to <see cref="MessageConsumer"/>.
+        /// </summary>
+        /// <param name="closeMessageQueue">Indicates whether or not to close the messageQueue for the MessageConsumer.</param>
+        internal override void Shutdown()
+        {
+            this.messageQueue.Close();
+        }
+
+        #endregion
+
+        #region IDisposable Methods
+        public void Dispose()
+        {
+            try
+            {
+                this.Close();     
+            }
+            catch (Exception ex)
+            {
+                Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", this.GetType().Name, this.Id, ex);
+            }   
+        }
+        protected override void Dispose(bool disposing)
+        {
+            if (!IsClosing && !IsClosed)
+            {
+                Tracer.InfoFormat("Consumer {0} stats: Transport Msgs {1}, Dispatch Msgs {2}, messageQueue {3}.",
+                    Id, transportMsgCount, messageDispatchCount, messageQueue.Count);
+            }
+            base.Dispose(disposing);
+            MessageListenerInUseEvent.Dispose();
+        }
+
+
+        #endregion
+
+        #region Inner MessageListenerDispatchEvent Class
+
+        protected class MessageListenerDispatchEvent : WaitableDispatchEvent 
+        {
+            private MessageConsumer consumer;
+            
+            internal MessageListenerDispatchEvent(MessageConsumer consumer) : base()
+            {
+                this.consumer = consumer;
+                Callback = this.DispatchMessageListeners;
+            }
+
+            public override void OnFailure(Exception e)
+            {
+                base.OnFailure(e);
+                consumer.Session.OnException(e);
+            }
+
+            public void DispatchMessageListeners()
+            {
+                IMessageDelivery delivery = null;
+                Message.Message nmsProviderMessage = null;
+                if (consumer.IsClosed) return;
+                consumer.EnterMessageListenerEvent();
+                // the consumer pending Message delivery task 
+                
+                while ((delivery = consumer.messageQueue.DequeueNoWait()) != null)
+                {
+                    nmsProviderMessage = delivery.Message;
+                    consumer.AddTaskRef();
+                    consumer.messageDispatchCount++;
+                    try
+                    {
+                        
+                        if (consumer.IsMessageExpired(delivery))
+                        {
+                            consumer.AckModified(delivery, true);
+                        }
+                        else if (consumer.IsMessageRedeliveryExceeded(delivery))
+                        {
+                            consumer.AckModified(delivery, true, true);
+                        }
+                        else
+                        {
+                            
+                            bool deliveryFailed = false;
+                            bool isAutoOrDupsOk = consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.AutoAcknowledge) ||
+                                                  consumer.Session.AcknowledgementMode.Equals(AcknowledgementMode.DupsOkAcknowledge);
+                            if (isAutoOrDupsOk)
+                            {
+                                consumer.delivered.AddLast(delivery);
+                            }
+                            else
+                            {
+                                consumer.AckReceived(delivery);
+                            }
+                             
+                            Message.Message copy = nmsProviderMessage.Copy();
+                            try
+                            {
+                                consumer.Session.ClearRecovered();
+                                consumer.PrepareMessageForDelivery(copy);
+                                if (Tracer.IsDebugEnabled)
+                                    Tracer.DebugFormat("Invoking Client Message Listener Callback for message {0}.", copy.NMSMessageId);
+                                consumer.OnMessage(copy);
+                            }
+                            catch (SystemException se)
+                            {
+                                Tracer.WarnFormat("Caught Exception on MessageListener for Consumer {0}. Message {1}.", consumer.Id, se.Message);
+                                deliveryFailed = true;
+                            }
+
+                            if (isAutoOrDupsOk && !consumer.Session.IsRecovered)
+                            {
+                                if (!deliveryFailed)
+                                {
+                                    consumer.AckConsumed(delivery);
+                                }
+                                else
+                                {
+                                    consumer.AckReleased(delivery);
+                                }
+                            }
+                        }
+
+                    }
+                    catch (Exception e)
+                    {
+                        // unhandled failure
+                        consumer.Session.OnException(e);
+                    }
+                    consumer.RemoveTaskRef();
+                }
+                consumer.LeaveMessageListenerEvent();
+            }
+        }
+
+        #endregion
+    }
+
+    #region Info class
+    internal class ConsumerInfo : LinkInfo
+    {
+        
+        protected const int DEFAULT_CREDIT = 200;
+
+        private int? credit = null;
+
+        internal ConsumerInfo(Id id) : base(id) { }
+
+        public int LinkCredit
+        {
+            get { return credit ?? DEFAULT_CREDIT; }
+            internal set { credit = value; }
+        }
+
+        public string Selector { get; internal set; } = null;
+        public string SubscriptionName { get; internal set; } = null;
+
+        public bool NoLocal { get; internal set; } = false;
+
+    }
+
+    #endregion
+
+}