You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:45 UTC

[08/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/TemporaryLink.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/TemporaryLink.cs b/src/main/csharp/TemporaryLink.cs
new file mode 100644
index 0000000..fbd011c
--- /dev/null
+++ b/src/main/csharp/TemporaryLink.cs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using Amqp;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP
+{
+    internal class TemporaryLink : MessageLink
+    {
+        private readonly string TOPIC = "Topic";
+        private readonly string QUEUE = "Queue";
+        private readonly string CREATOR_TOPIC = "temp-topic-creator";
+        private readonly string CREATOR_QUEUE = "temp-queue-creator";
+
+        internal TemporaryLink(Session session, TemporaryDestination destination) : base(session, destination)
+        {
+            Info = new TemporaryLinkInfo(destination.DestinationId);
+            this.RequestTimeout = session.Connection.RequestTimeout;
+        }
+
+        private TemporaryDestination TemporaryDestination { get => Destination as TemporaryDestination; }
+
+        private bool IsTopic { get => TemporaryDestination.IsTopic; }
+
+        private string LocalDestinationName { get => TemporaryDestination.DestinationId.ToString(); }
+
+        private string DestinationTypeName { get { return IsTopic ? TOPIC : QUEUE; } }
+
+        private void OnAttachResponse(ILink link, Attach attachResponse)
+        {
+            Tracer.InfoFormat("Received attach response for Temporary creator link. Link = {0}, Attach = {1}", link.Name, attachResponse);
+            Target target = (attachResponse.Target as Amqp.Framing.Target);
+            if(target?.Address != null)
+            {
+                this.TemporaryDestination.DestinationName = target.Address;
+            }
+            this.OnResponse();
+        }
+
+        private Source CreateSource()
+        {
+            Source result = new Source();
+
+            return result;
+        }
+
+        private Target CreateTarget()
+        {
+            Target result = new Target();
+            result.Durable = (uint)TerminusDurability.NONE;
+
+            result.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
+            result.Dynamic = true;
+
+            result.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_LINK_DETACH;
+            Amqp.Types.Fields dnp = new Amqp.Types.Fields();
+            dnp.Add(
+                SymbolUtil.ATTACH_DYNAMIC_NODE_PROPERTY_LIFETIME_POLICY,
+                SymbolUtil.DELETE_ON_CLOSE
+                );
+            result.DynamicNodeProperties = dnp;
+
+            return result;
+        }
+
+        private Attach CreateAttach()
+        {
+            Attach result = new Attach()
+            {
+                Source = CreateSource(),
+                Target = CreateTarget(),
+                SndSettleMode = SenderSettleMode.Unsettled,
+                RcvSettleMode = ReceiverSettleMode.First,
+            };
+
+            return result;
+        }
+
+        protected override ILink CreateLink()
+        {
+            Amqp.Session parentImpl = this.Session.InnerSession as Amqp.Session;
+            string linkDestinationName = "apache-nms:" + ((IsTopic) ? CREATOR_TOPIC : CREATOR_QUEUE ) + LocalDestinationName;
+            SenderLink link = new SenderLink(parentImpl, linkDestinationName, CreateAttach(), OnAttachResponse);
+            return link;
+        }
+
+        protected override void OnInternalClosed(IAmqpObject sender, Error error)
+        {
+            base.OnInternalClosed(sender, error);
+            this.OnResponse();
+        }
+
+        internal override void Shutdown()
+        {
+            base.Shutdown();
+            this.Session.Connection.Remove(this.TemporaryDestination);
+        }
+
+        protected override void StopResource()
+        {
+        }
+
+    }
+
+    internal class TemporaryLinkInfo : LinkInfo
+    {
+        public TemporaryLinkInfo (Id id) : base(id)
+        {
+
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Topic.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs
new file mode 100644
index 0000000..4db4ee6
--- /dev/null
+++ b/src/main/csharp/Topic.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP
+{
+    /// <summary>
+    /// Apache.NMS.AMQP.Topic implements Apache.NMS.ITopic
+    /// Topic is an concrete implementation for an abstract Destination.
+    /// </summary>
+    class Topic : Destination, ITopic
+    {
+        
+        #region Constructor
+
+        internal Topic(Connection conn, string topicString) : base(conn, topicString, false) {}
+
+        #endregion
+
+        #region Destination Methods
+
+        protected override void ValidateName(string name)
+        {
+            
+        }
+
+        #endregion
+
+        #region Destination Properties
+
+        public override DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.Topic;
+            }
+        }
+
+        #endregion
+
+        #region ITopic Properties
+
+        public string TopicName
+        {
+            get
+            {
+                return destinationName;
+            }
+        }
+
+        #endregion
+
+        #region IDisposable Methods
+
+        protected override void Dispose(bool disposing)
+        {
+            base.Dispose(disposing);
+        }
+
+        #endregion
+    }
+
+    /// <summary>
+    /// Apache.NMS.AMQP.TemporaryTopic implements Apache.NMS.ITemporaryTopic.
+    /// TemporaryTopic is an concrete implementation for an abstract TemporaryDestination.
+    /// </summary>
+    class TemporaryTopic :  TemporaryDestination, ITemporaryTopic
+    {
+        #region Constructor
+
+        internal TemporaryTopic(Connection conn) : base(conn, conn.TemporaryTopicGenerator.GenerateId(), false) { }
+
+        internal TemporaryTopic(Connection conn, string destinationName) : base(conn, destinationName, false) { }
+
+        #endregion
+
+        #region Destination Methods
+
+        protected override void ValidateName(string name)
+        {
+            
+        }
+
+        #endregion
+
+        #region Destination Properties
+
+        public override DestinationType DestinationType
+        {
+            get
+            {
+                return DestinationType.TemporaryTopic;
+            }
+        }
+
+        #endregion
+
+        #region ITopic Properties
+
+        public string TopicName
+        {
+            get
+            {
+                return destinationName;
+            }
+        }
+
+        #endregion
+
+        #region TemporaryDestination Methods
+
+        public override void Delete()
+        {
+            base.Delete();
+        }
+
+        #endregion 
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/AMQP/TransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/AMQP/TransportContext.cs b/src/main/csharp/Transport/AMQP/TransportContext.cs
new file mode 100644
index 0000000..2991ae9
--- /dev/null
+++ b/src/main/csharp/Transport/AMQP/TransportContext.cs
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Net.Sockets;
+using System.Collections.Specialized;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Transport.AMQP
+{
+
+    #region Transport Context
+
+    /// <summary>
+    /// Transport management is mainly handled by the AmqpNetLite library, Except for custom transports.
+    /// TransportContext should configure the Amqp.ConnectionFactory for the tcp transport properties.
+    /// </summary>
+    internal class TransportContext : IProviderTransportContext
+    {
+        protected ConnectionFactory factory = null;
+
+        protected Amqp.ConnectionFactory connectionBuilder = null;
+        
+        internal TransportContext(ConnectionFactory connectionFactory)
+        {
+            factory = connectionFactory;
+            connectionBuilder = factory.Factory as Amqp.ConnectionFactory;
+            connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+        }
+
+        // Copy Contructor
+        protected TransportContext() { }
+
+        public Apache.NMS.IConnectionFactory Owner { get { return factory; } }
+
+        #region Transport Options
+
+        public int ReceiveBufferSize { get => this.connectionBuilder.TCP.ReceiveBufferSize; set => this.connectionBuilder.TCP.ReceiveBufferSize = value; }
+        public int ReceiveTimeout { get => this.connectionBuilder.TCP.ReceiveTimeout; set => this.connectionBuilder.TCP.ReceiveTimeout = value; }
+        public int SendBufferSize { get => this.connectionBuilder.TCP.SendBufferSize; set => this.connectionBuilder.TCP.SendBufferSize = value; }
+        public int SendTimeout { get => this.connectionBuilder.TCP.SendTimeout; set => this.connectionBuilder.TCP.SendTimeout = value; }
+        public bool TcpNoDelayEnabled { get => this.connectionBuilder.TCP.NoDelay; set => this.connectionBuilder.TCP.NoDelay = value; }
+        public uint TcpKeepAliveTime { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime = value; }
+        public uint TcpKeepAliveInterval { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval = value; }
+
+        public bool SocketLingerEnabled
+        {
+            get => this.connectionBuilder.TCP?.LingerOption.Enabled ?? (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).Enabled;
+            set
+            {
+                if (this.connectionBuilder.TCP.LingerOption == null)
+                {
+                    (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).Enabled = value;
+                }
+                else
+                {
+                    this.connectionBuilder.TCP.LingerOption.Enabled = value;
+                }
+            }
+        }
+
+        public int SocketLingerTime
+        {
+            get => this.connectionBuilder.TCP?.LingerOption.LingerTime ?? (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).LingerTime;
+            set
+            {
+                if (this.connectionBuilder.TCP.LingerOption == null)
+                {
+                    (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).LingerTime = value;
+                }
+                else
+                {
+                    this.connectionBuilder.TCP.LingerOption.LingerTime = value;
+                }
+            }
+        }
+
+        /// <summary>
+        /// UseLogging Enables AmqpNetLite's Frame logging level.
+        /// </summary>
+        public bool UseLogging
+        {
+            get => ((Amqp.Trace.TraceLevel & Amqp.TraceLevel.Frame) == Amqp.TraceLevel.Frame);
+            set
+            {
+                if (value)
+                {
+                    Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel | Amqp.TraceLevel.Frame;
+                }
+                else
+                {
+                    Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel & ~Amqp.TraceLevel.Frame;
+                }
+            }
+        }
+        
+        #endregion
+
+        public virtual ProviderCreateConnection CreateConnectionBuilder()
+        {
+            return new ProviderCreateConnection(connectionBuilder.CreateAsync);
+        }
+
+        public virtual IProviderTransportContext Copy()
+        {
+            TransportContext copy = new TransportContext();
+            this.CopyInto(copy);
+            return copy;
+        }
+
+        protected virtual void CopyInto(TransportContext copy)
+        {
+            copy.factory = this.factory;
+            copy.UseLogging = this.UseLogging;
+            Amqp.ConnectionFactory builder = new Amqp.ConnectionFactory();
+            this.CopyBuilder(builder);
+            copy.connectionBuilder = builder;
+        }
+
+        protected virtual void CopyBuilder(Amqp.ConnectionFactory copy)
+        {
+            StringDictionary amqpProperties = PropertyUtil.GetProperties(this.connectionBuilder.AMQP);
+            StringDictionary tcpProperties = PropertyUtil.GetProperties(this.connectionBuilder.TCP);
+            PropertyUtil.SetProperties(copy.AMQP, amqpProperties);
+            PropertyUtil.SetProperties(copy.TCP, tcpProperties);
+            copy.SASL.Profile = this.connectionBuilder.SASL.Profile;
+        }
+
+    }
+
+    #endregion
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/IProviderTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/IProviderTransportContext.cs b/src/main/csharp/Transport/IProviderTransportContext.cs
new file mode 100644
index 0000000..9ccfd64
--- /dev/null
+++ b/src/main/csharp/Transport/IProviderTransportContext.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.AMQP.Transport
+{
+    /// <summary>
+    /// 
+    /// </summary>
+    internal interface IProviderTransportContext
+    {
+        Apache.NMS.IConnectionFactory Owner { get; }
+
+        #region Transport Options
+
+        int ReceiveBufferSize { get; set; }
+
+        int ReceiveTimeout { get; set; }
+
+        int SendBufferSize { get; set; }
+
+        int SendTimeout { get; set; }
+        
+        bool UseLogging { get; set; }
+
+        #endregion
+
+        ProviderCreateConnection CreateConnectionBuilder();
+
+        IProviderTransportContext Copy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs b/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
new file mode 100644
index 0000000..f3b5549
--- /dev/null
+++ b/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using System.Security.Cryptography.X509Certificates;
+using System.Net.Security;
+using System.Security.Authentication;
+using System.Collections.Generic;
+using System.Collections;
+using System.Threading.Tasks;
+
+using Apache.NMS;
+
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Transport.AMQP;
+using System.Collections.Specialized;
+
+namespace Apache.NMS.AMQP.Transport.Secure.AMQP
+{
+
+    /// <summary>
+    /// Secure Transport management is mainly handled by the AmqpNetLite library, Except for certicate selection and valibdation.
+    /// SecureTransportContext should configure the Amqp.ConnectionFactory for the ssl transport properties.
+    /// </summary>
+    internal class SecureTransportContext : TransportContext, IProviderSecureTransportContext
+    {
+        
+        private readonly static List<string> SupportedProtocols;
+        private readonly static Dictionary<string, int> SupportedProtocolValues;
+
+        #region static Initializer
+
+        static SecureTransportContext()
+        {
+            const string Default = "Default";
+            const string None = "None";
+            SupportedProtocols = new List<string>();
+            SupportedProtocolValues = new Dictionary<string, int>();
+            foreach (string name in Enum.GetNames(typeof(System.Security.Authentication.SslProtocols)))
+            {
+                if (name.Equals(Default, StringComparison.CurrentCultureIgnoreCase) ||
+                   name.Equals(None, StringComparison.CurrentCultureIgnoreCase))
+                {
+                    // ignore
+                }
+                else
+                {
+                    SupportedProtocols.Add(name);
+                }
+
+            }
+            foreach (int value in Enum.GetValues(typeof(System.Security.Authentication.SslProtocols)))
+            {
+                SslProtocols p = (System.Security.Authentication.SslProtocols)value;
+                if (p.Equals(SslProtocols.Default) ||
+                   p.Equals(SslProtocols.None))
+                {
+                    // ignore
+                }
+                else 
+                {
+                    string name = ((SslProtocols)value).ToString().ToLower();
+                    SupportedProtocolValues.Add(name, value);
+                }
+            }
+            if (Tracer.IsDebugEnabled)
+            {
+                Tracer.DebugFormat("Supported SSL protocols list {0}", Util.PropertyUtil.ToString(SupportedProtocols));
+            }
+        }
+
+        #endregion
+
+        
+        #region Constructors
+
+        internal SecureTransportContext(ConnectionFactory factory) : base(factory)
+        {
+            this.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
+            this.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
+            connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+        }
+
+        // Copy Contructor
+        protected SecureTransportContext() : base() { }
+
+        #endregion
+
+        #region Secure Transport Context Properties
+
+        public string KeyStoreName { get; set; }
+
+        public string KeyStorePassword { get; set; }
+
+        public string ClientCertFileName { get; set; }
+
+        public bool AcceptInvalidBrokerCert { get; set; } = false;
+        
+        public string ClientCertSubject { get; set; }
+        public string ClientCertPassword { get; set; }
+        public string KeyStoreLocation { get; set; }
+        public string SSLProtocol
+        {
+            get
+            {
+                return this.connectionBuilder?.SSL.Protocols.ToString();
+            }
+            set
+            {
+                this.connectionBuilder.SSL.Protocols = GetSslProtocols(value);
+            }
+        }
+
+        public bool CheckCertificateRevocation
+        {
+            get
+            {
+                return this.connectionBuilder?.SSL.CheckCertificateRevocation ?? false;
+            }
+            set
+            {
+                if(this.connectionBuilder != null)
+                    this.connectionBuilder.SSL.CheckCertificateRevocation = value;
+            }
+        }
+
+
+        public string ServerName { get; set; }
+
+        public RemoteCertificateValidationCallback ServerCertificateValidateCallback { get; set; }
+        public LocalCertificateSelectionCallback ClientCertificateSelectCallback { get; set; }
+
+        #endregion
+
+        #region Private Methods
+        // These are the default values given by amqpnetlite.
+        private static readonly SslProtocols DefaultProtocols = (new Amqp.ConnectionFactory()).SSL.Protocols;
+
+
+        private SslProtocols GetSslProtocols(string protocolString)
+        {
+            
+            if (!String.IsNullOrWhiteSpace(protocolString))
+            {
+                SslProtocols value = DefaultProtocols;
+                if(Enum.TryParse(protocolString, true, out value))
+                {
+                    return value;
+                }
+                else
+                {
+                    throw new InvalidPropertyException(SecureTransportPropertyInterceptor.SSL_PROTOCOLS_PROPERTY, string.Format("Failed to parse value {0}", protocolString));
+                }
+            }
+            else
+            {
+                return DefaultProtocols;
+            }
+            
+        }
+
+        private X509Certificate2Collection LoadClientCertificates()
+        {
+            X509Certificate2Collection certificates = new X509Certificate2Collection();
+
+            if(!String.IsNullOrWhiteSpace(this.ClientCertFileName))
+            {
+                Tracer.DebugFormat("Attempting to load Client Certificate file: {0}", this.ClientCertFileName);
+                X509Certificate2 certificate = new X509Certificate2(this.ClientCertFileName, this.ClientCertPassword);
+                Tracer.DebugFormat("Loaded Client Certificate: {0}", certificate.Subject);
+
+                certificates.Add(certificate);
+            }
+            else
+            {
+                string storeName = String.IsNullOrWhiteSpace(this.KeyStoreName) ? StoreName.My.ToString() : this.KeyStoreName;
+                StoreLocation storeLocation = StoreLocation.CurrentUser;
+                if(!String.IsNullOrWhiteSpace(this.KeyStoreLocation))
+                {
+                    bool found = false;
+                    foreach(string location in Enum.GetNames(typeof(StoreLocation)))
+                    {
+                        if(String.Compare(this.KeyStoreLocation, location, true) == 0)
+                        {
+                            storeLocation = (StoreLocation)Enum.Parse(typeof(StoreLocation), location, true);
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found)
+                    {
+                        throw new NMSException(string.Format("Invalid Store location {0}", this.KeyStoreLocation), NMSErrorCode.PROPERTY_ERROR);
+                    }
+                }
+
+                Tracer.DebugFormat("Loading store {0}, from location {1}.", storeName, storeLocation.ToString());
+                try
+                {
+                    X509Store store = new X509Store(storeName, storeLocation);
+
+                    store.Open(OpenFlags.ReadOnly);
+                    X509Certificate2[] storeCertificates = new X509Certificate2[store.Certificates.Count];
+                    store.Certificates.CopyTo(storeCertificates, 0);
+                    certificates.AddRange(storeCertificates);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.WarnFormat("Error loading KeyStore, name : {0}; location : {1}. Cause {2}", storeName, storeLocation, ex);
+                    throw ExceptionSupport.Wrap(ex, "Error loading KeyStore.", storeName, storeLocation.ToString());
+                }
+            }
+
+            return certificates;
+        }
+
+        private Task<Amqp.Connection> CreateSecureConnection(Amqp.Address addr, Amqp.Framing.Open open, Amqp.OnOpened onOpened)
+        {   
+            ProviderCreateConnection delagate = base.CreateConnectionBuilder();
+            return delagate.Invoke(addr, open, onOpened);
+        }
+
+        #endregion
+
+        #region IProviderSecureTransportContext Methods
+
+        public override ProviderCreateConnection CreateConnectionBuilder()
+        {
+
+            // Load local certificates
+            this.connectionBuilder.SSL.ClientCertificates.AddRange(LoadClientCertificates());
+            Tracer.DebugFormat("Loading Certificates from {0} possibilit{1}.", this.connectionBuilder.SSL.ClientCertificates.Count, (this.connectionBuilder.SSL.ClientCertificates.Count == 1) ? "y" : "ies");
+
+            // log assigned SSL protocols
+            Tracer.DebugFormat("Set accepted SSL protocols to {0}.", this.SSLProtocol);
+
+            if (this.connectionBuilder.SSL.Protocols == SslProtocols.None)
+            {
+                throw new NMSSecurityException(string.Format("Invalid SSL Protocol {0} selected from system supported protocols {1}", this.SSLProtocol, PropertyUtil.ToString(SupportedProtocols)));
+            }
+
+            return new ProviderCreateConnection(this.CreateSecureConnection);
+        }
+
+        #endregion
+
+        #region Certificate Callbacks
+
+        protected X509Certificate ContextLocalCertificateSelect(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)
+        {
+            if (Tracer.IsDebugEnabled)
+            {
+                string subjects = "{";
+                string issuers = "{";
+                string acceptedIssuers = "{";
+
+                foreach (X509Certificate cert in localCertificates)
+                {
+                    subjects += cert.Subject + ", ";
+                    issuers += cert.Issuer + ", ";
+                }
+
+                subjects += "}";
+                issuers += "}";
+
+                for (int i = 0; i < acceptableIssuers.Length; i++)
+                {
+                    acceptedIssuers += acceptableIssuers[i] + ", ";
+                }
+
+                Tracer.DebugFormat("Local Certificate Selection.\n" +
+                    "Sender {0}, Target Host {1}, Remote Cert Subject {2}, Remote Cert Issuer {3}" +
+                    "\nlocal Cert Subjects {4}, " +
+                    "\nlocal Cert Issuers {5}",
+                    sender.ToString(),
+                    targetHost,
+                    remoteCertificate?.Subject,
+                    remoteCertificate?.Issuer,
+                    subjects,
+                    issuers);
+            }
+            X509Certificate localCertificate = null;
+            if (ClientCertificateSelectCallback != null)
+            {
+                try
+                {
+                    if (Tracer.IsDebugEnabled) Tracer.DebugFormat("Calling application callback for Local certificate selection.");
+                    localCertificate = ClientCertificateSelectCallback(sender, targetHost, localCertificates, remoteCertificate, acceptableIssuers);
+                }
+                catch (Exception ex)
+                {
+                    Tracer.InfoFormat("Caught Exception from application callback for local certificate selction. Exception : {0}", ex);
+                    throw ex;
+                }
+            }
+            else if (localCertificates.Count >= 1)
+            {
+                // when there is only one certificate select that certificate.
+                localCertificate = localCertificates[0];
+                if (!String.IsNullOrWhiteSpace(this.ClientCertSubject))
+                {
+                    // should the application identify a specific certificate to use search for that certificate.
+                    localCertificate = null;
+                    foreach (X509Certificate cert in localCertificates)
+                    {
+                        if (String.Compare(cert.Subject, this.ClientCertSubject, true) == 0)
+                        {
+                            localCertificate = cert;
+                            break;
+                        }
+                    }
+                    
+                }
+            }
+
+            if (localCertificate == null)
+            {
+                Tracer.InfoFormat("Could not select Local Certificate for target host {0}", targetHost);
+            }
+            else if (Tracer.IsDebugEnabled)
+            {
+                Tracer.DebugFormat("Selected Local Certificate {0}", localCertificate.ToString());
+            }
+
+            return localCertificate;
+        }
+
+        protected bool ContextServerCertificateValidation(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
+        {
+
+            if (Tracer.IsDebugEnabled)
+            {
+                string name = null;
+                if (certificate is X509Certificate2)
+                {
+                    X509Certificate2 cert = certificate as X509Certificate2;
+                    name = cert.SubjectName.Name;
+
+
+                }
+                Tracer.DebugFormat("Cert DN {0}; Cert Subject {1}; Cert Issuer {2}; SSLPolicyErrors [{3}]", name, certificate?.Subject ?? "null", certificate?.Issuer ?? "null", sslPolicyErrors.ToString());
+                try
+                {
+                    X509VerificationFlags verFlags = chain.ChainPolicy.VerificationFlags;
+                    X509RevocationMode revMode = chain.ChainPolicy.RevocationMode;
+                    X509RevocationFlag revFlags = chain.ChainPolicy.RevocationFlag;
+                    StringBuilder sb = new StringBuilder();
+                    sb.Append("ChainStatus={");
+                    int size = sb.Length;
+                    foreach (X509ChainStatus status in chain.ChainStatus)
+                    {
+                        X509ChainStatusFlags csflags = status.Status;
+                        sb.AppendFormat("Info={0}; flags=0x{1:X}; flagNames=[{2}]", status.StatusInformation, csflags, csflags.ToString());
+                        sb.Append(", ");
+                    }
+                    if (size != sb.Length)
+                    {
+                        sb.Remove(sb.Length - 2, 2);
+                    }
+                    sb.Append("}");
+
+                    Tracer.DebugFormat("X.509 Cert Chain, Verification Flags {0:X} {1}, Revocation Mode {2}, Revocation Flags {3}, Status {4} ",
+                        verFlags, verFlags.ToString(), revMode.ToString(), revFlags.ToString(), sb.ToString());
+                }
+                catch (Exception ex)
+                {
+                    Tracer.ErrorFormat("Error displaying Remote Cert fields. Cause: {0}", ex);
+                }
+            }
+
+            bool? valid = null;
+            if (ServerCertificateValidateCallback != null)
+            {
+                try
+                {
+                    if (Tracer.IsDebugEnabled) Tracer.DebugFormat("Calling application callback for Remote Certificate Validation.");
+                    valid = ServerCertificateValidateCallback(sender, certificate, chain, sslPolicyErrors);
+                }
+                catch (Exception ex)
+                {
+                    Tracer.InfoFormat("Caught Exception from application callback for Remote Certificate Validation. Exception : {0}", ex);
+                    throw ex;
+                }
+            }
+            else 
+            {
+                if ((sslPolicyErrors & SslPolicyErrors.RemoteCertificateNameMismatch) == SslPolicyErrors.RemoteCertificateNameMismatch
+                   && !String.IsNullOrWhiteSpace(this.ServerName))
+                {
+                    if (certificate.Subject.IndexOf(string.Format("CN={0}",
+                    this.ServerName), StringComparison.InvariantCultureIgnoreCase) > -1)
+                    {
+                        sslPolicyErrors &= ~(SslPolicyErrors.RemoteCertificateNameMismatch);
+                    }
+                }
+                if (sslPolicyErrors == SslPolicyErrors.None)
+                {
+                    valid = true;
+                }
+                else
+                {
+                    Tracer.WarnFormat("SSL certificate {0} validation error : {1}", certificate.Subject, sslPolicyErrors.ToString());
+                    valid = this.AcceptInvalidBrokerCert;
+                }
+            }
+            return valid ?? this.AcceptInvalidBrokerCert;
+        }
+
+        #endregion
+        
+        #region Copy Methods
+
+
+        protected override void CopyBuilder(Amqp.ConnectionFactory copy)
+        {
+            base.CopyBuilder(copy);
+            
+            copy.SSL.Protocols = connectionBuilder.SSL.Protocols;
+            copy.SSL.CheckCertificateRevocation = connectionBuilder.SSL.CheckCertificateRevocation;
+
+            if (connectionBuilder.SSL.ClientCertificates != null)
+            {
+                copy.SSL.ClientCertificates = new X509CertificateCollection(connectionBuilder.SSL.ClientCertificates);
+            }
+
+        }
+
+        protected override void CopyInto(TransportContext copy)
+        {
+            SecureTransportContext stcCopy = copy as SecureTransportContext;
+
+            // Copy Secure properties.
+
+            // copy keystore properties
+            stcCopy.KeyStoreName = this.KeyStoreName;
+            stcCopy.KeyStorePassword = this.KeyStorePassword;
+            stcCopy.KeyStoreLocation = this.KeyStoreLocation;
+
+            // copy certificate properties
+            stcCopy.AcceptInvalidBrokerCert = this.AcceptInvalidBrokerCert;
+            stcCopy.ServerName = this.ServerName;
+            stcCopy.ClientCertFileName = this.ClientCertFileName;
+            stcCopy.ClientCertPassword = this.ClientCertPassword;
+            stcCopy.ClientCertSubject = this.ClientCertSubject;
+
+            // copy application callback
+            stcCopy.ServerCertificateValidateCallback = this.ServerCertificateValidateCallback;
+            stcCopy.ClientCertificateSelectCallback = this.ClientCertificateSelectCallback;
+            
+            base.CopyInto(copy);
+
+            stcCopy.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
+            stcCopy.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
+        }
+
+        public override IProviderTransportContext Copy()
+        {
+            TransportContext copy = new SecureTransportContext();
+            this.CopyInto(copy);
+            return copy;
+        }
+        
+        IProviderSecureTransportContext IProviderSecureTransportContext.Copy()
+        {
+            return this.Copy() as SecureTransportContext;
+        }
+
+        #endregion
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs b/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
new file mode 100644
index 0000000..24b5bbf
--- /dev/null
+++ b/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Net.Security;
+
+namespace Apache.NMS.AMQP.Transport.Secure
+{
+    internal interface IProviderSecureTransportContext : IProviderTransportContext
+    {
+        
+        new IProviderSecureTransportContext Copy();
+
+        string ServerName { get; set; }
+
+        string ClientCertFileName { get; set; }
+
+        string ClientCertSubject { get; set; }
+
+        string ClientCertPassword { get; set; }
+
+        string KeyStoreName { get; set; }
+        
+        string KeyStoreLocation { get; set; }
+
+        bool AcceptInvalidBrokerCert { get; set; }
+
+        string SSLProtocol { get; set; }
+        
+        RemoteCertificateValidationCallback ServerCertificateValidateCallback { get; set; }
+
+        LocalCertificateSelectionCallback ClientCertificateSelectCallback { get; set; }
+
+        bool CheckCertificateRevocation { get; set; }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/AtomicSequence.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/AtomicSequence.cs b/src/main/csharp/Util/AtomicSequence.cs
new file mode 100644
index 0000000..fd59a94
--- /dev/null
+++ b/src/main/csharp/Util/AtomicSequence.cs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Util
+{
+    /// <summary>
+    /// Simple utility class used mainly for Id generation.
+    /// </summary>
+    class AtomicSequence : Atomic<ulong>
+    {
+        public AtomicSequence() : base()
+        {
+        }
+
+        public AtomicSequence(ulong defaultValue) : base(defaultValue)
+        {
+        }
+
+        public ulong getAndIncrement()
+        {
+            ulong val = 0;
+            lock (this)
+            {
+                val = atomicValue;
+                atomicValue++;
+            }
+            return val;
+        }
+
+        public override string ToString()
+        {
+            return Value.ToString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/DispatchExecutor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/DispatchExecutor.cs b/src/main/csharp/Util/DispatchExecutor.cs
new file mode 100644
index 0000000..65fea61
--- /dev/null
+++ b/src/main/csharp/Util/DispatchExecutor.cs
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS.Util;
+using Apache.NMS;
+
+
+
+namespace Apache.NMS.AMQP.Util
+{
+
+    internal delegate void Executable();
+
+    internal interface IExecutable
+    {
+        Executable GetExecutable();
+        void OnFailure(Exception e);
+    }
+
+    internal interface IWaitable
+    {
+        bool IsComplete { get; }
+
+        bool IsCancelled { get; }
+
+        bool Wait();
+        bool Wait(TimeSpan timeout);
+    }
+
+    /// <summary>
+    /// General Dispatch Event to execute code from a DispatchExecutor.
+    /// </summary>
+    class DispatchEvent : IExecutable
+    {
+        private Executable exe;
+
+        public virtual Executable Callback
+        {
+            get { return exe; }
+            protected  set { exe = value; }
+        }
+
+        internal DispatchEvent() : this(null) { }
+
+        internal DispatchEvent(Executable e) { exe = e; }
+
+        public virtual void OnFailure(Exception e)
+        {
+            Tracer.WarnFormat("Encountered Exception: {0} stack: {1}.", e.Message, e.StackTrace);
+        }
+
+        public Executable GetExecutable()
+        {
+            return exe;
+        }
+    }
+
+    #region Waitable Dispatcher Event Class
+    /// <summary>
+    /// A Dispatch event that is completion aware of its current state. This allows threads other then the Dispatch Executor thread to synchronize with the dispatch event.
+    /// </summary>
+    internal class WaitableDispatchEvent : DispatchEvent, IWaitable
+    {
+        private ManualResetEvent handle = new ManualResetEvent(false);
+
+        /* state:
+            0 = Not Signaled,
+            1 = Signaled,
+            2 = Cancelled
+        */
+        #region EventState
+        protected class EventState
+        {
+            internal static EventState INITIAL = new EventState(0, "INITIAL");
+            internal static EventState SIGNALED = new EventState(1, "SIGNALED");
+            internal static EventState CANCELLED = new EventState(2, "CANCELLED");
+            internal static EventState UNKNOWN = new EventState(-1, "UNKNOWN");
+
+            private static Dictionary<int, EventState> States = new Dictionary<int, EventState>()
+            {
+                { INITIAL.value, INITIAL },
+                { SIGNALED.value, SIGNALED },
+                { CANCELLED.value, CANCELLED }
+            };
+
+            
+
+            private readonly int value;
+            private readonly string name;
+            private EventState(int ordinal, string name = null)
+            {
+                value = ordinal;
+                this.name = name ?? ordinal.ToString();
+            }
+
+            public static implicit operator int(EventState es)
+            {
+                return es != null ? es.value : UNKNOWN.value;
+            }
+
+            public static implicit operator EventState(int value)
+            {
+                if(!States.TryGetValue(value, out EventState state))
+                {
+                    state = UNKNOWN;
+                }
+
+                return state;
+            }
+
+            public override int GetHashCode()
+            {
+                return this.value;
+            }
+
+            public override bool Equals(object obj)
+            {
+                if(obj != null && obj is EventState)
+                {
+                    return this.value == (obj as EventState).value;
+                }
+                return false;
+            }
+
+            public override string ToString()
+            {
+                return this.name;
+            }
+        }
+        #endregion
+
+        private Atomic<int> state;
+
+        private Exception failureCause = null;
+
+        public override Executable Callback
+        {
+            get => base.Callback;
+            protected set
+            {
+                Executable cb;
+
+                if (value == null)
+                {
+                    cb = () =>
+                    {
+                        this.Release();
+                    };
+                }
+                else
+                {
+                    cb = () =>
+                    {
+                        value.Invoke();
+                        this.Release();
+                    };
+                }
+
+                base.Callback = cb;
+            }
+        }
+
+        public bool IsComplete => EventState.SIGNALED.Equals(state.Value);
+
+        public bool IsCancelled => EventState.CANCELLED.Equals(state.Value);
+
+        internal WaitableDispatchEvent() : this(null)
+        {
+        }
+
+        internal WaitableDispatchEvent(Executable e) 
+        {
+            handle = new ManualResetEvent(false);
+            state = new Atomic<int>(EventState.INITIAL);
+            
+            this.Callback = e;
+        }
+
+        public void Reset()
+        {
+            state.GetAndSet(EventState.INITIAL);
+            if (!handle.Reset())
+            {
+                throw new NMSException("Failed to reset Waitable Event Signal.");
+            }
+        }
+
+        public void Cancel()
+        {
+            if (state.CompareAndSet(EventState.INITIAL, EventState.CANCELLED))
+            {
+                if (!handle.Set())
+                {
+                    failureCause = new NMSException("Failed to cancel Waitable Event.");
+                }
+            }
+        }
+
+        private void Release()
+        {
+            if (state.CompareAndSet(EventState.INITIAL, EventState.SIGNALED))
+            {
+                
+                if (!handle.Set())
+                {
+                    state.GetAndSet(EventState.CANCELLED);
+                    failureCause =  new NMSException("Failed to release Waitable Event.");
+                }
+            }
+        }
+
+        public bool Wait()
+        {
+            return Wait(TimeSpan.Zero);
+        }
+
+        public bool Wait(TimeSpan timeout)
+        {
+            bool signaled = (timeout.Equals(TimeSpan.Zero)) ? handle.WaitOne() : handle.WaitOne(timeout);
+            if (state.Value == EventState.CANCELLED)
+            {
+                signaled = false;
+                if (failureCause != null)
+                {
+                    throw failureCause;
+                }
+            }
+            return signaled;
+        }
+    }
+
+    #endregion
+
+    /// <summary>
+    /// Single Thread Executor for Dispatch Event. This Encapsulates Threading restrictions for Client code serialization.
+    /// </summary>
+    class DispatchExecutor : NMSResource, IDisposable
+    {
+        private static AtomicSequence ExecutorId = new AtomicSequence(1);
+        private const string ExecutorName = "DispatchExecutor";
+        
+        private const int DEFAULT_SIZE = 100000;
+        private const int DEFAULT_DEQUEUE_TIMEOUT = 10000;
+        private Queue<IExecutable> queue;
+        private int maxSize;
+        private bool closed=false;
+        private Atomic<bool> closeQueued = new Atomic<bool>(false);
+        private bool executing=false;
+        private Semaphore suspendLock = new Semaphore(0, 10, "Suspend");
+        private Thread executingThread;
+        private readonly string name;
+        private readonly object objLock = new object();
+
+        #region Constructors
+
+        public DispatchExecutor() : this(DEFAULT_SIZE) { }
+
+        public DispatchExecutor(bool drain) : this(DEFAULT_SIZE, drain) { }
+
+        public DispatchExecutor(int size, bool drain = false)
+        {
+            this.maxSize = size;
+            this.ExecuteDrain = drain;
+            queue = new Queue<IExecutable>(maxSize);
+            executingThread = new Thread(new ThreadStart(this.Dispatch));
+            executingThread.IsBackground = true;
+            name = ExecutorName + ExecutorId.getAndIncrement() + ":" + executingThread.ManagedThreadId;
+            executingThread.Name = name;
+        }
+
+        ~DispatchExecutor()
+        {
+            try
+            {
+                Dispose(false);
+            }
+            catch (Exception ex)
+            {
+                Tracer.DebugFormat("Caught exception in Finalizer for Dispatcher : {0}. Exception {1}", this.name, ex);
+            }
+        }
+
+        #endregion
+
+        #region Properties
+
+        protected object ThisLock { get { return objLock; } }
+
+        protected bool Closing { get { return closeQueued.Value; } }
+
+        public string Name { get { return name; } }
+
+        internal bool ExecuteDrain { get; private set; }
+
+        internal bool IsOnDispatchThread
+        {
+            get
+            {
+                string currentThreadName = Thread.CurrentThread.Name;
+                return currentThreadName != null && currentThreadName.Equals(name);
+            }
+        }
+
+        #endregion
+
+        #region Private Suspend Resume Methods
+
+#if TRACELOCKS
+        int scount = 0;
+#endif
+
+        protected void Suspend()
+        {
+            Exception e=null;
+            while(!AcquireSuspendLock(out e) && !closed)
+            {
+                if (e != null)
+                {
+                    throw e;
+                }
+            }
+        }
+
+        protected bool AcquireSuspendLock()
+        {
+            Exception e;
+            return AcquireSuspendLock(out e);
+        }
+
+        protected bool AcquireSuspendLock(out Exception ex)
+        {
+            bool signaled = false;
+            ex = null;
+            try
+            {
+#if TRACELOCKS
+                Tracer.InfoFormat("Aquiring Suspend Lock Count {0}", scount);
+#endif
+                signaled = this.suspendLock.WaitOne();
+#if TRACELOCKS
+                scount = signaled ? scount - 1 : scount;
+#endif
+            }catch(Exception e)
+            {
+                ex = e;
+            }
+#if TRACELOCKS
+            finally
+            {
+                Tracer.InfoFormat("Suspend Lock Count after aquire {0} signaled {1}", scount, signaled);
+            }
+#endif
+            return signaled;
+        }
+
+        protected void Resume()
+        {
+            Exception ex;
+            int count = ReleaseSuspendLock(out ex);
+            if (ex != null)
+            {
+                throw ex;
+            }
+        }
+
+        protected int ReleaseSuspendLock()
+        {
+            Exception e;
+            return ReleaseSuspendLock(out e);
+        }
+
+        protected int ReleaseSuspendLock(out Exception ex)
+        {
+            ex = null;
+            int previous = -1;
+            try
+            {
+#if TRACELOCKS
+                Tracer.InfoFormat("Suspend Lock Count before release {0}", scount);
+#endif
+                previous = this.suspendLock.Release();
+#if TRACELOCKS
+                scount = previous != -1 ? scount + 1 : scount;
+                Tracer.InfoFormat("Suspend Lock Count after release {0} previous Value {1}", scount, previous);
+#endif
+            }
+            catch (SemaphoreFullException sfe)
+            {
+                // ignore multiple resume calls
+                // Log for debugging
+                Tracer.DebugFormat("Multiple Resume called on running Dispatcher. Cause: {0}", sfe.Message);
+                
+            }
+            catch(System.IO.IOException ioe)
+            {
+                Tracer.ErrorFormat("Failed resuming or starting Dispatch thread. Cause: {0}", ioe.Message);
+                ex = ioe;
+            }
+            catch(UnauthorizedAccessException uae)
+            {
+                Tracer.Error(uae.StackTrace);
+                ex =  uae;
+            }
+            if(ex!=null)
+                Console.WriteLine("Release Error {0}", ex);
+            return previous;
+        }
+
+        #endregion
+
+        #region Protected Dispatch Methods
+
+        protected void CloseOnQueue()
+        {
+            bool ifDrain = false;
+            bool executeDrain = false;
+            lock (queue)
+            {
+                if (!closed)
+                {
+                    Stop();
+                    closed = true;
+                    executing = false;
+                    ifDrain = true;
+                    executeDrain = ExecuteDrain;
+                    Monitor.PulseAll(queue);
+                    Tracer.InfoFormat("DistpachExecutor: {0} Closed.", name);
+                }
+            
+            }
+            if (ifDrain)
+            {
+                // Drain the rest of the queue before closing
+                Drain(executeDrain);
+            }
+        }
+
+        protected IExecutable[] DrainOffQueue()
+        {
+            lock (queue)
+            {
+                ArrayList list = new ArrayList(queue.Count);
+                while (queue.Count > 0)
+                {
+                    list.Add(queue.Dequeue());
+                }
+                return (IExecutable[])list.ToArray(typeof(IExecutable));
+            }
+        }
+
+        protected void Drain(bool execute = false)
+        {
+            IExecutable[] exes = DrainOffQueue();
+            if (execute)
+            {
+                foreach (IExecutable exe in exes)
+                {
+                    DispatchEvent(exe);
+                }
+            }
+        }
+
+        protected void DispatchEvent(IExecutable dispatchEvent)
+        {
+            Executable exe = dispatchEvent.GetExecutable();
+            if (exe != null)
+            {
+                try
+                {
+                    exe.Invoke();
+                }
+                catch (Exception e)
+                {
+                    // connect to exception listener here.
+                    dispatchEvent.OnFailure(ExceptionSupport.Wrap(e, "Dispatch Executor Error ({0}):", this.name));
+                    
+                }
+            }
+        }
+
+        protected void Dispatch()
+        {
+            while (!closed)
+            {
+                bool locked = false;
+                while (!closed && !(locked = this.AcquireSuspendLock())) { }
+                if (locked)
+                {
+                    int count = this.ReleaseSuspendLock();
+#if TraceLocks
+                    Tracer.InfoFormat("Dispatch Suspend Lock Count {0}, Current Count {1}", count, count+1);
+#endif
+                }
+                if (closed)
+                {
+                    break;
+                }
+
+                while (IsStarted)
+                {
+                    
+                    IExecutable exe;
+                    if (TryDequeue(out exe, DEFAULT_DEQUEUE_TIMEOUT))
+                    {
+                        
+                        DispatchEvent(exe);
+                    }
+                    else
+                    {
+                        // queue stopped or timed out
+                        Tracer.DebugFormat("Queue {0} did not dispatch due to being Suspended or Closed.", name);
+                    }
+                }
+
+            }
+
+        }
+
+#endregion
+
+        #region NMSResource Methods
+
+        protected override void StartResource()
+        {
+            if (!executing)
+            {
+                executing = true;
+                executingThread.Start();
+                Resume();
+            }
+            else
+            {
+                Resume();
+            }
+        }
+
+        protected override void StopResource()
+        {
+            
+            lock (queue)
+            {
+                if (queue.Count == 0)
+                {
+                    Monitor.PulseAll(queue);
+                }
+            }
+            
+            Suspend();
+        }
+
+        protected override void ThrowIfClosed()
+        {
+            if (closed)
+            {
+                throw new Apache.NMS.IllegalStateException("Illegal Operation on closed " + this.GetType().Name + ".");
+            }
+        }
+        
+        #endregion
+        
+        #region Public Methods
+
+        /// <summary>
+        /// Closes the Dispatch Executor. See <see cref="DispatchExecutor.Shutdown(bool)"/>.
+        /// </summary>
+        public void Close()
+        {
+            this.Dispose(true);
+            //this.Shutdown(true);
+        }
+
+        public void Enqueue(IExecutable o)
+        {
+            if(o == null)
+            {
+                return;
+            }
+            lock (queue)
+            {
+                while (queue.Count >= maxSize)
+                {
+                    if (closed)
+                    {
+                        return;
+                    }
+                    Monitor.Wait(queue);
+                }
+                queue.Enqueue(o);
+                if (queue.Count == 1)
+                {
+                    Monitor.PulseAll(queue);
+                }
+                
+            }
+        }
+
+        public bool TryDequeue(out IExecutable exe, int timeout = -1)
+        {
+            exe = null;
+            lock (queue)
+            {
+                bool signaled = true;
+                while (queue.Count == 0)
+                {
+                    if (closed || mode.Value.Equals(Resource.Mode.Stopping))
+                    {
+                        return false;
+                    }
+                    signaled = (timeout > -1 ) ? Monitor.Wait(queue, timeout) : Monitor.Wait(queue);
+                }
+                if (!signaled)
+                {
+                    return false;
+                }
+
+                exe = queue.Dequeue();
+                if (queue.Count == maxSize - 1)
+                {
+                    Monitor.PulseAll(queue);
+                }
+                
+            }
+            return true;
+        }
+
+        #endregion
+
+        #region IDispose Methods
+
+        /// <summary>
+        /// Shudowns down the dispatch Thread.
+        /// </summary>
+        /// <param name="join">
+        /// True, indicates whether the shutdown is orderly and therfore can block to join the thread.
+        /// False, indicates the shutdown can not block.
+        /// Default value is False.
+        /// </param>
+        internal void Shutdown(bool join = false)
+        {
+            if (IsOnDispatchThread)
+            {
+                // close is called in the Dispatcher Thread so we can just close
+                if (false == closeQueued.GetAndSet(true))
+                {
+                    this.CloseOnQueue();
+                }
+            }
+            else if (closeQueued.CompareAndSet(false, true))
+            {
+                if (!IsStarted && executing)
+                {
+                    // resume dispatching thread for Close Message Dispatch Event
+                    Start();
+                }
+                // enqueue close
+                this.Enqueue(new DispatchEvent(this.CloseOnQueue));
+
+                if (join && executingThread != null)
+                {
+                    // thread join must not happen under lock (queue) statement
+                    if (!executingThread.ThreadState.Equals(ThreadState.Unstarted))
+                    {
+                        executingThread.Join();
+                    }
+                    executingThread = null;
+                }
+
+            }
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (closed) return;
+            lock (queue)
+            {
+                if (closed) return;
+            }
+            if (disposing)
+            {
+                // remove reference to dispatcher to be garbage collected
+                if (executingThread != null && executingThread.ThreadState.Equals(ThreadState.Unstarted))
+                {
+                    executingThread = null;
+                }
+                this.Shutdown(true);
+                this.suspendLock.Dispose();
+                this.queue = null;
+            }
+            else
+            {
+                this.Shutdown();
+                this.suspendLock.Dispose();
+                this.queue = null;
+            }
+        }
+
+        public void Dispose()
+        {
+            try
+            {
+                this.Close();
+            }
+            catch (Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception during Dispose for Dispatcher {0}. Exception {1}", this.name, ex);
+            }
+        }
+
+        #endregion
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/ExceptionSupport.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/ExceptionSupport.cs b/src/main/csharp/Util/ExceptionSupport.cs
new file mode 100644
index 0000000..8d93564
--- /dev/null
+++ b/src/main/csharp/Util/ExceptionSupport.cs
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Reflection;
+using Apache.NMS;
+using Amqp.Framing;
+using Amqp;
+
+namespace Apache.NMS.AMQP.Util
+{
+    class ExceptionSupport
+    {
+        private static readonly Dictionary<string, Type> errTypeMap;
+        
+        static ExceptionSupport()
+        {
+            // mapping of amqp .Net Lite error code to NMS exception type
+
+            errTypeMap = new Dictionary<string, Type>();
+            errTypeMap.Add(NMSErrorCode.CONNECTION_TIME_OUT, typeof(NMSConnectionException));
+            errTypeMap.Add(ErrorCode.ConnectionRedirect, typeof(NMSConnectionException));
+            errTypeMap.Add(ErrorCode.ConnectionForced, typeof(NMSConnectionException));
+            errTypeMap.Add(ErrorCode.IllegalState, typeof(IllegalStateException));
+
+            errTypeMap.Add(NMSErrorCode.INTERNAL_ERROR, typeof(NMSException));
+            errTypeMap.Add(NMSErrorCode.UNKNOWN_ERROR, typeof(NMSException));
+            errTypeMap.Add(NMSErrorCode.SESSION_TIME_OUT, typeof(NMSException));
+            errTypeMap.Add(NMSErrorCode.LINK_TIME_OUT, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.DecodeError, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.DetachForced, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.ErrantLink, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.FrameSizeTooSmall, typeof(NMSConnectionException));
+            errTypeMap.Add(ErrorCode.FramingError, typeof(NMSConnectionException));
+            errTypeMap.Add(ErrorCode.HandleInUse, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.InternalError, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.InvalidField, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.LinkRedirect, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.MessageReleased, typeof(IllegalStateException));
+            errTypeMap.Add(ErrorCode.MessageSizeExceeded, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.NotAllowed, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.NotFound, typeof(InvalidDestinationException));
+            errTypeMap.Add(ErrorCode.NotImplemented, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.PreconditionFailed, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.ResourceDeleted, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.ResourceLimitExceeded, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.ResourceLocked, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.Stolen, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.TransactionRollback, typeof(TransactionRolledBackException));
+            errTypeMap.Add(ErrorCode.TransactionTimeout, typeof(TransactionInProgressException));
+            errTypeMap.Add(ErrorCode.TransactionUnknownId, typeof(TransactionRolledBackException));
+            errTypeMap.Add(ErrorCode.TransferLimitExceeded, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.UnattachedHandle, typeof(NMSException));
+            errTypeMap.Add(ErrorCode.UnauthorizedAccess, typeof(NMSSecurityException));
+            errTypeMap.Add(ErrorCode.WindowViolation, typeof(NMSException));
+            
+        }
+
+        private static FieldInfo[] GetConstants(Type type)
+        {
+            ArrayList list = new ArrayList();
+            FieldInfo[] fields = type.GetFields(
+                BindingFlags.Static | 
+                BindingFlags.Public | 
+                BindingFlags.FlattenHierarchy
+                );
+            foreach (FieldInfo field in fields)
+            {
+                if (field.IsLiteral && !field.IsInitOnly)
+                    list.Add(field);
+            }
+            return (FieldInfo[])list.ToArray(typeof(FieldInfo));
+        }
+
+        private static string[] GetStringConstants(Type type)
+        {
+            FieldInfo[] fields = GetConstants(type);
+            ArrayList list = new ArrayList(fields.Length);
+            foreach(FieldInfo fi in fields)
+            {
+                if (fi.FieldType.Equals(typeof(string)))
+                {
+                    list.Add(fi.GetValue(null));
+                }
+            }
+            return (string[])list.ToArray(typeof(string));
+        }
+
+        public static NMSException GetTimeoutException(IAmqpObject obj, string format, params object[] args)
+        {
+            return GetTimeoutException(obj, string.Format(format, args));
+        }
+
+        public static NMSException GetTimeoutException(IAmqpObject obj, string message)
+        {
+            Error e = null;
+            if (obj is Amqp.Connection)
+            {
+                e = NMSError.CONNECTION_TIMEOUT;
+            }
+            else if (obj is Amqp.Session)
+            {
+                e = NMSError.SESSION_TIMEOUT;
+            }
+            else if (obj is Amqp.Link)
+            {
+                e = NMSError.LINK_TIMEOUT;
+            }
+            
+            return GetException(e, message);
+
+        }
+
+        public static NMSException GetException(IAmqpObject obj, string format, params object[] args)
+        {
+            return GetException(obj, string.Format(format, args));
+        }
+
+        public static NMSException GetException(Error amqpErr, string format, params object[] args)
+        {
+            return GetException(amqpErr, string.Format(format, args));
+        }
+
+        public static NMSException GetException(IAmqpObject obj, string message="")
+        {
+            return GetException(obj.Error, message);
+        }
+
+        public static NMSException GetException(Error amqpErr, string message = "", Exception e = null)
+        {
+            string errCode = null;
+            string errMessage = null;
+            string additionalErrInfo = null;
+            if (amqpErr == null)
+            {
+                amqpErr = NMSError.INTERNAL;
+            }
+
+            errCode = amqpErr.Condition.ToString();
+            errMessage = amqpErr.Description;
+
+            errMessage = errMessage != null ? ", Description = " + errMessage : "";
+
+            if (amqpErr.Info != null && amqpErr.Info.Count > 0)
+            {
+                additionalErrInfo = ", ErrorInfo = " + Types.ConversionSupport.ToString(amqpErr.Info);
+            }
+            if (null == e)
+            {
+                // no exception given, create a NMSunthrownException to hold the 
+                // stack and use it as the innerException in the constructors for
+                // the NMSexception we create., the custom StackTrace() will allow exception listeners to
+                // see the stack to here
+                e = new NMSProviderError(errCode, errMessage);
+            }
+            NMSException ex = null;
+            Type exType = null;
+            if(errTypeMap.TryGetValue(errCode, out exType))
+            {
+                ConstructorInfo ci = exType.GetConstructor(new[] { typeof(string), typeof(string), typeof(Exception) });
+                object inst = ci.Invoke(new object[] { message + errMessage + additionalErrInfo , errCode, e });
+                ex = inst as NMSException;
+            }
+            else
+            {
+                ex = new NMSException(message + errMessage + additionalErrInfo, errCode, e);
+            }
+            return ex;
+            
+        }
+        
+        public static NMSException Wrap(Exception e, string format, params object[] args)
+        {
+            return Wrap(e, string.Format(format, args));
+        }
+
+        public static NMSException Wrap(Exception e, string message = "")
+        {
+            if(e == null)
+            {
+                return null;
+            }
+            NMSException nmsEx = null;
+            string exMessage = message;
+            if (exMessage == null || exMessage.Length == 0)
+            {
+                exMessage = e.Message;
+            }
+            if (e is NMSException)
+            {
+                return e as NMSException;
+            }
+            else if (e is AmqpException)
+            {
+                Error err = (e as AmqpException).Error;
+                nmsEx = GetException(err, message, e);
+                Tracer.DebugFormat("Encoutered AmqpException {0} and created NMS Exception {1}.", e, nmsEx);
+            }
+            else
+            {
+                nmsEx = new NMSException(exMessage, NMSErrorCode.INTERNAL_ERROR, e);
+            }
+            
+            return nmsEx;
+        }
+
+    }
+
+    #region Exceptions
+
+
+    public class InvalidPropertyException : NMSException
+    {
+        protected static string ExFormat = "Invalid Property {0}. Cause: {1}";
+
+        public InvalidPropertyException(string property, string message) : base(string.Format(ExFormat, property, message))
+        {
+            exceptionErrorCode = NMSErrorCode.PROPERTY_ERROR;
+        }
+    }
+    // The API converts AMPQ Errors to NMSProviderError. This is typically added to
+    // the Exception queue and passed to the ExceptionListener.  As the Exception is
+    // instantiated but never thrown, it does not have an exception-stack trace. We add
+    // one to the private member InstanceTrace and override StackTrace so useful 
+    // information can be displayed.
+
+    internal class NMSProviderError : NMSException
+    {
+        private string InstanceTrace;
+
+        public NMSProviderError() : base()
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+        }
+
+        public NMSProviderError(string message) : base(message)
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+        }
+
+        public NMSProviderError(string message, string errorCode) : base(message, errorCode)
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+        }
+
+        public NMSProviderError(string message, NMSException innerException) : base(message, innerException)
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+            exceptionErrorCode = innerException.ErrorCode ?? NMSErrorCode.INTERNAL_ERROR;
+        }
+
+        public NMSProviderError(string message, Exception innerException) : base(message, innerException)
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+        }
+
+        public NMSProviderError(string message, string errorCode, Exception innerException) : base(message, errorCode, innerException)
+        {
+            System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+            InstanceTrace = trace.ToString();
+        }
+
+        public override string StackTrace
+        {
+            get
+            {
+                string stack = base.StackTrace;
+                if (stack == null || stack.Length == 0)
+                {
+                    stack = InstanceTrace;
+                }
+                if (InnerException != null && (InnerException.StackTrace != null && InnerException.StackTrace.Length > 0))
+                {
+                    stack += "\nCause " + InnerException.Message + " : \n" +
+                                 InnerException.StackTrace;
+
+                }
+                return stack;
+            }
+        }
+    }
+
+    #endregion
+
+    #region Error Codes
+
+    internal static class NMSError 
+    {
+        public static Error SESSION_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) { Description = "Session Begin Request has timed out." };
+        public static Error CONNECTION_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) {  Description = "Connection Open Request has timed out." };
+        public static Error LINK_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) { Description = "Link Attach Request has timed out." };
+        public static Error PROPERTY = new Error(NMSErrorCode.PROPERTY_ERROR) {  Description = "Property Error." };
+        public static Error UNKNOWN = new Error(NMSErrorCode.UNKNOWN_ERROR) { Description = "Unknown Error." };
+        public static Error INTERNAL = new Error(NMSErrorCode.INTERNAL_ERROR) { Description = "Internal Error." };
+
+    }
+    internal static class NMSErrorCode
+    {
+        public static string CONNECTION_TIME_OUT = "nms:connection:timout";
+        public static string SESSION_TIME_OUT = "nms:session:timout";
+        public static string LINK_TIME_OUT = "nms:link:timeout";
+        public static string PROPERTY_ERROR = "nms:property:error";
+        public static string UNKNOWN_ERROR = "nms:unknown";
+        public static string INTERNAL_ERROR = "nms:internal";
+    }
+
+    #endregion
+}