You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/08/27 21:15:45 UTC
[08/15] activemq-nms-amqp git commit: AMQNET-575: NMS AMQP Client
Rework Add an NMS API implementation that wraps the AMQPnetLite .NET API.
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/TemporaryLink.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/TemporaryLink.cs b/src/main/csharp/TemporaryLink.cs
new file mode 100644
index 0000000..fbd011c
--- /dev/null
+++ b/src/main/csharp/TemporaryLink.cs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using Amqp;
+using Amqp.Framing;
+
+namespace Apache.NMS.AMQP
+{
+ internal class TemporaryLink : MessageLink
+ {
+ private readonly string TOPIC = "Topic";
+ private readonly string QUEUE = "Queue";
+ private readonly string CREATOR_TOPIC = "temp-topic-creator";
+ private readonly string CREATOR_QUEUE = "temp-queue-creator";
+
+ internal TemporaryLink(Session session, TemporaryDestination destination) : base(session, destination)
+ {
+ Info = new TemporaryLinkInfo(destination.DestinationId);
+ this.RequestTimeout = session.Connection.RequestTimeout;
+ }
+
+ private TemporaryDestination TemporaryDestination { get => Destination as TemporaryDestination; }
+
+ private bool IsTopic { get => TemporaryDestination.IsTopic; }
+
+ private string LocalDestinationName { get => TemporaryDestination.DestinationId.ToString(); }
+
+ private string DestinationTypeName { get { return IsTopic ? TOPIC : QUEUE; } }
+
+ private void OnAttachResponse(ILink link, Attach attachResponse)
+ {
+ Tracer.InfoFormat("Received attach response for Temporary creator link. Link = {0}, Attach = {1}", link.Name, attachResponse);
+ Target target = (attachResponse.Target as Amqp.Framing.Target);
+ if(target?.Address != null)
+ {
+ this.TemporaryDestination.DestinationName = target.Address;
+ }
+ this.OnResponse();
+ }
+
+ private Source CreateSource()
+ {
+ Source result = new Source();
+
+ return result;
+ }
+
+ private Target CreateTarget()
+ {
+ Target result = new Target();
+ result.Durable = (uint)TerminusDurability.NONE;
+
+ result.Capabilities = new[] { SymbolUtil.GetTerminusCapabilitiesForDestination(Destination) };
+ result.Dynamic = true;
+
+ result.ExpiryPolicy = SymbolUtil.ATTACH_EXPIRY_POLICY_LINK_DETACH;
+ Amqp.Types.Fields dnp = new Amqp.Types.Fields();
+ dnp.Add(
+ SymbolUtil.ATTACH_DYNAMIC_NODE_PROPERTY_LIFETIME_POLICY,
+ SymbolUtil.DELETE_ON_CLOSE
+ );
+ result.DynamicNodeProperties = dnp;
+
+ return result;
+ }
+
+ private Attach CreateAttach()
+ {
+ Attach result = new Attach()
+ {
+ Source = CreateSource(),
+ Target = CreateTarget(),
+ SndSettleMode = SenderSettleMode.Unsettled,
+ RcvSettleMode = ReceiverSettleMode.First,
+ };
+
+ return result;
+ }
+
+ protected override ILink CreateLink()
+ {
+ Amqp.Session parentImpl = this.Session.InnerSession as Amqp.Session;
+ string linkDestinationName = "apache-nms:" + ((IsTopic) ? CREATOR_TOPIC : CREATOR_QUEUE ) + LocalDestinationName;
+ SenderLink link = new SenderLink(parentImpl, linkDestinationName, CreateAttach(), OnAttachResponse);
+ return link;
+ }
+
+ protected override void OnInternalClosed(IAmqpObject sender, Error error)
+ {
+ base.OnInternalClosed(sender, error);
+ this.OnResponse();
+ }
+
+ internal override void Shutdown()
+ {
+ base.Shutdown();
+ this.Session.Connection.Remove(this.TemporaryDestination);
+ }
+
+ protected override void StopResource()
+ {
+ }
+
+ }
+
+ internal class TemporaryLinkInfo : LinkInfo
+ {
+ public TemporaryLinkInfo (Id id) : base(id)
+ {
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Topic.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs
new file mode 100644
index 0000000..4db4ee6
--- /dev/null
+++ b/src/main/csharp/Topic.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS;
+
+namespace Apache.NMS.AMQP
+{
+ /// <summary>
+ /// Apache.NMS.AMQP.Topic implements Apache.NMS.ITopic
+ /// Topic is an concrete implementation for an abstract Destination.
+ /// </summary>
+ class Topic : Destination, ITopic
+ {
+
+ #region Constructor
+
+ internal Topic(Connection conn, string topicString) : base(conn, topicString, false) {}
+
+ #endregion
+
+ #region Destination Methods
+
+ protected override void ValidateName(string name)
+ {
+
+ }
+
+ #endregion
+
+ #region Destination Properties
+
+ public override DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.Topic;
+ }
+ }
+
+ #endregion
+
+ #region ITopic Properties
+
+ public string TopicName
+ {
+ get
+ {
+ return destinationName;
+ }
+ }
+
+ #endregion
+
+ #region IDisposable Methods
+
+ protected override void Dispose(bool disposing)
+ {
+ base.Dispose(disposing);
+ }
+
+ #endregion
+ }
+
+ /// <summary>
+ /// Apache.NMS.AMQP.TemporaryTopic implements Apache.NMS.ITemporaryTopic.
+ /// TemporaryTopic is an concrete implementation for an abstract TemporaryDestination.
+ /// </summary>
+ class TemporaryTopic : TemporaryDestination, ITemporaryTopic
+ {
+ #region Constructor
+
+ internal TemporaryTopic(Connection conn) : base(conn, conn.TemporaryTopicGenerator.GenerateId(), false) { }
+
+ internal TemporaryTopic(Connection conn, string destinationName) : base(conn, destinationName, false) { }
+
+ #endregion
+
+ #region Destination Methods
+
+ protected override void ValidateName(string name)
+ {
+
+ }
+
+ #endregion
+
+ #region Destination Properties
+
+ public override DestinationType DestinationType
+ {
+ get
+ {
+ return DestinationType.TemporaryTopic;
+ }
+ }
+
+ #endregion
+
+ #region ITopic Properties
+
+ public string TopicName
+ {
+ get
+ {
+ return destinationName;
+ }
+ }
+
+ #endregion
+
+ #region TemporaryDestination Methods
+
+ public override void Delete()
+ {
+ base.Delete();
+ }
+
+ #endregion
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/AMQP/TransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/AMQP/TransportContext.cs b/src/main/csharp/Transport/AMQP/TransportContext.cs
new file mode 100644
index 0000000..2991ae9
--- /dev/null
+++ b/src/main/csharp/Transport/AMQP/TransportContext.cs
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Net.Sockets;
+using System.Collections.Specialized;
+using Apache.NMS.AMQP.Util;
+
+namespace Apache.NMS.AMQP.Transport.AMQP
+{
+
+ #region Transport Context
+
+ /// <summary>
+ /// Transport management is mainly handled by the AmqpNetLite library, Except for custom transports.
+ /// TransportContext should configure the Amqp.ConnectionFactory for the tcp transport properties.
+ /// </summary>
+ internal class TransportContext : IProviderTransportContext
+ {
+ protected ConnectionFactory factory = null;
+
+ protected Amqp.ConnectionFactory connectionBuilder = null;
+
+ internal TransportContext(ConnectionFactory connectionFactory)
+ {
+ factory = connectionFactory;
+ connectionBuilder = factory.Factory as Amqp.ConnectionFactory;
+ connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+ }
+
+ // Copy Contructor
+ protected TransportContext() { }
+
+ public Apache.NMS.IConnectionFactory Owner { get { return factory; } }
+
+ #region Transport Options
+
+ public int ReceiveBufferSize { get => this.connectionBuilder.TCP.ReceiveBufferSize; set => this.connectionBuilder.TCP.ReceiveBufferSize = value; }
+ public int ReceiveTimeout { get => this.connectionBuilder.TCP.ReceiveTimeout; set => this.connectionBuilder.TCP.ReceiveTimeout = value; }
+ public int SendBufferSize { get => this.connectionBuilder.TCP.SendBufferSize; set => this.connectionBuilder.TCP.SendBufferSize = value; }
+ public int SendTimeout { get => this.connectionBuilder.TCP.SendTimeout; set => this.connectionBuilder.TCP.SendTimeout = value; }
+ public bool TcpNoDelayEnabled { get => this.connectionBuilder.TCP.NoDelay; set => this.connectionBuilder.TCP.NoDelay = value; }
+ public uint TcpKeepAliveTime { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveTime = value; }
+ public uint TcpKeepAliveInterval { get => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval; set => this.connectionBuilder.TCP.KeepAlive.KeepAliveInterval = value; }
+
+ public bool SocketLingerEnabled
+ {
+ get => this.connectionBuilder.TCP?.LingerOption.Enabled ?? (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).Enabled;
+ set
+ {
+ if (this.connectionBuilder.TCP.LingerOption == null)
+ {
+ (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).Enabled = value;
+ }
+ else
+ {
+ this.connectionBuilder.TCP.LingerOption.Enabled = value;
+ }
+ }
+ }
+
+ public int SocketLingerTime
+ {
+ get => this.connectionBuilder.TCP?.LingerOption.LingerTime ?? (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).LingerTime;
+ set
+ {
+ if (this.connectionBuilder.TCP.LingerOption == null)
+ {
+ (this.connectionBuilder.TCP.LingerOption = new LingerOption(false, 0)).LingerTime = value;
+ }
+ else
+ {
+ this.connectionBuilder.TCP.LingerOption.LingerTime = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// UseLogging Enables AmqpNetLite's Frame logging level.
+ /// </summary>
+ public bool UseLogging
+ {
+ get => ((Amqp.Trace.TraceLevel & Amqp.TraceLevel.Frame) == Amqp.TraceLevel.Frame);
+ set
+ {
+ if (value)
+ {
+ Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel | Amqp.TraceLevel.Frame;
+ }
+ else
+ {
+ Amqp.Trace.TraceLevel = Amqp.Trace.TraceLevel & ~Amqp.TraceLevel.Frame;
+ }
+ }
+ }
+
+ #endregion
+
+ public virtual ProviderCreateConnection CreateConnectionBuilder()
+ {
+ return new ProviderCreateConnection(connectionBuilder.CreateAsync);
+ }
+
+ public virtual IProviderTransportContext Copy()
+ {
+ TransportContext copy = new TransportContext();
+ this.CopyInto(copy);
+ return copy;
+ }
+
+ protected virtual void CopyInto(TransportContext copy)
+ {
+ copy.factory = this.factory;
+ copy.UseLogging = this.UseLogging;
+ Amqp.ConnectionFactory builder = new Amqp.ConnectionFactory();
+ this.CopyBuilder(builder);
+ copy.connectionBuilder = builder;
+ }
+
+ protected virtual void CopyBuilder(Amqp.ConnectionFactory copy)
+ {
+ StringDictionary amqpProperties = PropertyUtil.GetProperties(this.connectionBuilder.AMQP);
+ StringDictionary tcpProperties = PropertyUtil.GetProperties(this.connectionBuilder.TCP);
+ PropertyUtil.SetProperties(copy.AMQP, amqpProperties);
+ PropertyUtil.SetProperties(copy.TCP, tcpProperties);
+ copy.SASL.Profile = this.connectionBuilder.SASL.Profile;
+ }
+
+ }
+
+ #endregion
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/IProviderTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/IProviderTransportContext.cs b/src/main/csharp/Transport/IProviderTransportContext.cs
new file mode 100644
index 0000000..9ccfd64
--- /dev/null
+++ b/src/main/csharp/Transport/IProviderTransportContext.cs
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.AMQP.Transport
+{
+ /// <summary>
+ ///
+ /// </summary>
+ internal interface IProviderTransportContext
+ {
+ Apache.NMS.IConnectionFactory Owner { get; }
+
+ #region Transport Options
+
+ int ReceiveBufferSize { get; set; }
+
+ int ReceiveTimeout { get; set; }
+
+ int SendBufferSize { get; set; }
+
+ int SendTimeout { get; set; }
+
+ bool UseLogging { get; set; }
+
+ #endregion
+
+ ProviderCreateConnection CreateConnectionBuilder();
+
+ IProviderTransportContext Copy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs b/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
new file mode 100644
index 0000000..f3b5549
--- /dev/null
+++ b/src/main/csharp/Transport/Secure/AMQP/SecureTransportContext.cs
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using System.Security.Cryptography.X509Certificates;
+using System.Net.Security;
+using System.Security.Authentication;
+using System.Collections.Generic;
+using System.Collections;
+using System.Threading.Tasks;
+
+using Apache.NMS;
+
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Transport.AMQP;
+using System.Collections.Specialized;
+
+namespace Apache.NMS.AMQP.Transport.Secure.AMQP
+{
+
+ /// <summary>
+ /// Secure Transport management is mainly handled by the AmqpNetLite library, Except for certicate selection and valibdation.
+ /// SecureTransportContext should configure the Amqp.ConnectionFactory for the ssl transport properties.
+ /// </summary>
+ internal class SecureTransportContext : TransportContext, IProviderSecureTransportContext
+ {
+
+ private readonly static List<string> SupportedProtocols;
+ private readonly static Dictionary<string, int> SupportedProtocolValues;
+
+ #region static Initializer
+
+ static SecureTransportContext()
+ {
+ const string Default = "Default";
+ const string None = "None";
+ SupportedProtocols = new List<string>();
+ SupportedProtocolValues = new Dictionary<string, int>();
+ foreach (string name in Enum.GetNames(typeof(System.Security.Authentication.SslProtocols)))
+ {
+ if (name.Equals(Default, StringComparison.CurrentCultureIgnoreCase) ||
+ name.Equals(None, StringComparison.CurrentCultureIgnoreCase))
+ {
+ // ignore
+ }
+ else
+ {
+ SupportedProtocols.Add(name);
+ }
+
+ }
+ foreach (int value in Enum.GetValues(typeof(System.Security.Authentication.SslProtocols)))
+ {
+ SslProtocols p = (System.Security.Authentication.SslProtocols)value;
+ if (p.Equals(SslProtocols.Default) ||
+ p.Equals(SslProtocols.None))
+ {
+ // ignore
+ }
+ else
+ {
+ string name = ((SslProtocols)value).ToString().ToLower();
+ SupportedProtocolValues.Add(name, value);
+ }
+ }
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Supported SSL protocols list {0}", Util.PropertyUtil.ToString(SupportedProtocols));
+ }
+ }
+
+ #endregion
+
+
+ #region Constructors
+
+ internal SecureTransportContext(ConnectionFactory factory) : base(factory)
+ {
+ this.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
+ this.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
+ connectionBuilder.SASL.Profile = Amqp.Sasl.SaslProfile.Anonymous;
+ }
+
+ // Copy Contructor
+ protected SecureTransportContext() : base() { }
+
+ #endregion
+
+ #region Secure Transport Context Properties
+
+ public string KeyStoreName { get; set; }
+
+ public string KeyStorePassword { get; set; }
+
+ public string ClientCertFileName { get; set; }
+
+ public bool AcceptInvalidBrokerCert { get; set; } = false;
+
+ public string ClientCertSubject { get; set; }
+ public string ClientCertPassword { get; set; }
+ public string KeyStoreLocation { get; set; }
+ public string SSLProtocol
+ {
+ get
+ {
+ return this.connectionBuilder?.SSL.Protocols.ToString();
+ }
+ set
+ {
+ this.connectionBuilder.SSL.Protocols = GetSslProtocols(value);
+ }
+ }
+
+ public bool CheckCertificateRevocation
+ {
+ get
+ {
+ return this.connectionBuilder?.SSL.CheckCertificateRevocation ?? false;
+ }
+ set
+ {
+ if(this.connectionBuilder != null)
+ this.connectionBuilder.SSL.CheckCertificateRevocation = value;
+ }
+ }
+
+
+ public string ServerName { get; set; }
+
+ public RemoteCertificateValidationCallback ServerCertificateValidateCallback { get; set; }
+ public LocalCertificateSelectionCallback ClientCertificateSelectCallback { get; set; }
+
+ #endregion
+
+ #region Private Methods
+ // These are the default values given by amqpnetlite.
+ private static readonly SslProtocols DefaultProtocols = (new Amqp.ConnectionFactory()).SSL.Protocols;
+
+
+ private SslProtocols GetSslProtocols(string protocolString)
+ {
+
+ if (!String.IsNullOrWhiteSpace(protocolString))
+ {
+ SslProtocols value = DefaultProtocols;
+ if(Enum.TryParse(protocolString, true, out value))
+ {
+ return value;
+ }
+ else
+ {
+ throw new InvalidPropertyException(SecureTransportPropertyInterceptor.SSL_PROTOCOLS_PROPERTY, string.Format("Failed to parse value {0}", protocolString));
+ }
+ }
+ else
+ {
+ return DefaultProtocols;
+ }
+
+ }
+
+ private X509Certificate2Collection LoadClientCertificates()
+ {
+ X509Certificate2Collection certificates = new X509Certificate2Collection();
+
+ if(!String.IsNullOrWhiteSpace(this.ClientCertFileName))
+ {
+ Tracer.DebugFormat("Attempting to load Client Certificate file: {0}", this.ClientCertFileName);
+ X509Certificate2 certificate = new X509Certificate2(this.ClientCertFileName, this.ClientCertPassword);
+ Tracer.DebugFormat("Loaded Client Certificate: {0}", certificate.Subject);
+
+ certificates.Add(certificate);
+ }
+ else
+ {
+ string storeName = String.IsNullOrWhiteSpace(this.KeyStoreName) ? StoreName.My.ToString() : this.KeyStoreName;
+ StoreLocation storeLocation = StoreLocation.CurrentUser;
+ if(!String.IsNullOrWhiteSpace(this.KeyStoreLocation))
+ {
+ bool found = false;
+ foreach(string location in Enum.GetNames(typeof(StoreLocation)))
+ {
+ if(String.Compare(this.KeyStoreLocation, location, true) == 0)
+ {
+ storeLocation = (StoreLocation)Enum.Parse(typeof(StoreLocation), location, true);
+ found = true;
+ break;
+ }
+ }
+ if (!found)
+ {
+ throw new NMSException(string.Format("Invalid Store location {0}", this.KeyStoreLocation), NMSErrorCode.PROPERTY_ERROR);
+ }
+ }
+
+ Tracer.DebugFormat("Loading store {0}, from location {1}.", storeName, storeLocation.ToString());
+ try
+ {
+ X509Store store = new X509Store(storeName, storeLocation);
+
+ store.Open(OpenFlags.ReadOnly);
+ X509Certificate2[] storeCertificates = new X509Certificate2[store.Certificates.Count];
+ store.Certificates.CopyTo(storeCertificates, 0);
+ certificates.AddRange(storeCertificates);
+ }
+ catch(Exception ex)
+ {
+ Tracer.WarnFormat("Error loading KeyStore, name : {0}; location : {1}. Cause {2}", storeName, storeLocation, ex);
+ throw ExceptionSupport.Wrap(ex, "Error loading KeyStore.", storeName, storeLocation.ToString());
+ }
+ }
+
+ return certificates;
+ }
+
+ private Task<Amqp.Connection> CreateSecureConnection(Amqp.Address addr, Amqp.Framing.Open open, Amqp.OnOpened onOpened)
+ {
+ ProviderCreateConnection delagate = base.CreateConnectionBuilder();
+ return delagate.Invoke(addr, open, onOpened);
+ }
+
+ #endregion
+
+ #region IProviderSecureTransportContext Methods
+
+ public override ProviderCreateConnection CreateConnectionBuilder()
+ {
+
+ // Load local certificates
+ this.connectionBuilder.SSL.ClientCertificates.AddRange(LoadClientCertificates());
+ Tracer.DebugFormat("Loading Certificates from {0} possibilit{1}.", this.connectionBuilder.SSL.ClientCertificates.Count, (this.connectionBuilder.SSL.ClientCertificates.Count == 1) ? "y" : "ies");
+
+ // log assigned SSL protocols
+ Tracer.DebugFormat("Set accepted SSL protocols to {0}.", this.SSLProtocol);
+
+ if (this.connectionBuilder.SSL.Protocols == SslProtocols.None)
+ {
+ throw new NMSSecurityException(string.Format("Invalid SSL Protocol {0} selected from system supported protocols {1}", this.SSLProtocol, PropertyUtil.ToString(SupportedProtocols)));
+ }
+
+ return new ProviderCreateConnection(this.CreateSecureConnection);
+ }
+
+ #endregion
+
+ #region Certificate Callbacks
+
+ protected X509Certificate ContextLocalCertificateSelect(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers)
+ {
+ if (Tracer.IsDebugEnabled)
+ {
+ string subjects = "{";
+ string issuers = "{";
+ string acceptedIssuers = "{";
+
+ foreach (X509Certificate cert in localCertificates)
+ {
+ subjects += cert.Subject + ", ";
+ issuers += cert.Issuer + ", ";
+ }
+
+ subjects += "}";
+ issuers += "}";
+
+ for (int i = 0; i < acceptableIssuers.Length; i++)
+ {
+ acceptedIssuers += acceptableIssuers[i] + ", ";
+ }
+
+ Tracer.DebugFormat("Local Certificate Selection.\n" +
+ "Sender {0}, Target Host {1}, Remote Cert Subject {2}, Remote Cert Issuer {3}" +
+ "\nlocal Cert Subjects {4}, " +
+ "\nlocal Cert Issuers {5}",
+ sender.ToString(),
+ targetHost,
+ remoteCertificate?.Subject,
+ remoteCertificate?.Issuer,
+ subjects,
+ issuers);
+ }
+ X509Certificate localCertificate = null;
+ if (ClientCertificateSelectCallback != null)
+ {
+ try
+ {
+ if (Tracer.IsDebugEnabled) Tracer.DebugFormat("Calling application callback for Local certificate selection.");
+ localCertificate = ClientCertificateSelectCallback(sender, targetHost, localCertificates, remoteCertificate, acceptableIssuers);
+ }
+ catch (Exception ex)
+ {
+ Tracer.InfoFormat("Caught Exception from application callback for local certificate selction. Exception : {0}", ex);
+ throw ex;
+ }
+ }
+ else if (localCertificates.Count >= 1)
+ {
+ // when there is only one certificate select that certificate.
+ localCertificate = localCertificates[0];
+ if (!String.IsNullOrWhiteSpace(this.ClientCertSubject))
+ {
+ // should the application identify a specific certificate to use search for that certificate.
+ localCertificate = null;
+ foreach (X509Certificate cert in localCertificates)
+ {
+ if (String.Compare(cert.Subject, this.ClientCertSubject, true) == 0)
+ {
+ localCertificate = cert;
+ break;
+ }
+ }
+
+ }
+ }
+
+ if (localCertificate == null)
+ {
+ Tracer.InfoFormat("Could not select Local Certificate for target host {0}", targetHost);
+ }
+ else if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Selected Local Certificate {0}", localCertificate.ToString());
+ }
+
+ return localCertificate;
+ }
+
+ protected bool ContextServerCertificateValidation(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
+ {
+
+ if (Tracer.IsDebugEnabled)
+ {
+ string name = null;
+ if (certificate is X509Certificate2)
+ {
+ X509Certificate2 cert = certificate as X509Certificate2;
+ name = cert.SubjectName.Name;
+
+
+ }
+ Tracer.DebugFormat("Cert DN {0}; Cert Subject {1}; Cert Issuer {2}; SSLPolicyErrors [{3}]", name, certificate?.Subject ?? "null", certificate?.Issuer ?? "null", sslPolicyErrors.ToString());
+ try
+ {
+ X509VerificationFlags verFlags = chain.ChainPolicy.VerificationFlags;
+ X509RevocationMode revMode = chain.ChainPolicy.RevocationMode;
+ X509RevocationFlag revFlags = chain.ChainPolicy.RevocationFlag;
+ StringBuilder sb = new StringBuilder();
+ sb.Append("ChainStatus={");
+ int size = sb.Length;
+ foreach (X509ChainStatus status in chain.ChainStatus)
+ {
+ X509ChainStatusFlags csflags = status.Status;
+ sb.AppendFormat("Info={0}; flags=0x{1:X}; flagNames=[{2}]", status.StatusInformation, csflags, csflags.ToString());
+ sb.Append(", ");
+ }
+ if (size != sb.Length)
+ {
+ sb.Remove(sb.Length - 2, 2);
+ }
+ sb.Append("}");
+
+ Tracer.DebugFormat("X.509 Cert Chain, Verification Flags {0:X} {1}, Revocation Mode {2}, Revocation Flags {3}, Status {4} ",
+ verFlags, verFlags.ToString(), revMode.ToString(), revFlags.ToString(), sb.ToString());
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Error displaying Remote Cert fields. Cause: {0}", ex);
+ }
+ }
+
+ bool? valid = null;
+ if (ServerCertificateValidateCallback != null)
+ {
+ try
+ {
+ if (Tracer.IsDebugEnabled) Tracer.DebugFormat("Calling application callback for Remote Certificate Validation.");
+ valid = ServerCertificateValidateCallback(sender, certificate, chain, sslPolicyErrors);
+ }
+ catch (Exception ex)
+ {
+ Tracer.InfoFormat("Caught Exception from application callback for Remote Certificate Validation. Exception : {0}", ex);
+ throw ex;
+ }
+ }
+ else
+ {
+ if ((sslPolicyErrors & SslPolicyErrors.RemoteCertificateNameMismatch) == SslPolicyErrors.RemoteCertificateNameMismatch
+ && !String.IsNullOrWhiteSpace(this.ServerName))
+ {
+ if (certificate.Subject.IndexOf(string.Format("CN={0}",
+ this.ServerName), StringComparison.InvariantCultureIgnoreCase) > -1)
+ {
+ sslPolicyErrors &= ~(SslPolicyErrors.RemoteCertificateNameMismatch);
+ }
+ }
+ if (sslPolicyErrors == SslPolicyErrors.None)
+ {
+ valid = true;
+ }
+ else
+ {
+ Tracer.WarnFormat("SSL certificate {0} validation error : {1}", certificate.Subject, sslPolicyErrors.ToString());
+ valid = this.AcceptInvalidBrokerCert;
+ }
+ }
+ return valid ?? this.AcceptInvalidBrokerCert;
+ }
+
+ #endregion
+
+ #region Copy Methods
+
+
+ protected override void CopyBuilder(Amqp.ConnectionFactory copy)
+ {
+ base.CopyBuilder(copy);
+
+ copy.SSL.Protocols = connectionBuilder.SSL.Protocols;
+ copy.SSL.CheckCertificateRevocation = connectionBuilder.SSL.CheckCertificateRevocation;
+
+ if (connectionBuilder.SSL.ClientCertificates != null)
+ {
+ copy.SSL.ClientCertificates = new X509CertificateCollection(connectionBuilder.SSL.ClientCertificates);
+ }
+
+ }
+
+ protected override void CopyInto(TransportContext copy)
+ {
+ SecureTransportContext stcCopy = copy as SecureTransportContext;
+
+ // Copy Secure properties.
+
+ // copy keystore properties
+ stcCopy.KeyStoreName = this.KeyStoreName;
+ stcCopy.KeyStorePassword = this.KeyStorePassword;
+ stcCopy.KeyStoreLocation = this.KeyStoreLocation;
+
+ // copy certificate properties
+ stcCopy.AcceptInvalidBrokerCert = this.AcceptInvalidBrokerCert;
+ stcCopy.ServerName = this.ServerName;
+ stcCopy.ClientCertFileName = this.ClientCertFileName;
+ stcCopy.ClientCertPassword = this.ClientCertPassword;
+ stcCopy.ClientCertSubject = this.ClientCertSubject;
+
+ // copy application callback
+ stcCopy.ServerCertificateValidateCallback = this.ServerCertificateValidateCallback;
+ stcCopy.ClientCertificateSelectCallback = this.ClientCertificateSelectCallback;
+
+ base.CopyInto(copy);
+
+ stcCopy.connectionBuilder.SSL.RemoteCertificateValidationCallback = this.ContextServerCertificateValidation;
+ stcCopy.connectionBuilder.SSL.LocalCertificateSelectionCallback = this.ContextLocalCertificateSelect;
+ }
+
+ public override IProviderTransportContext Copy()
+ {
+ TransportContext copy = new SecureTransportContext();
+ this.CopyInto(copy);
+ return copy;
+ }
+
+ IProviderSecureTransportContext IProviderSecureTransportContext.Copy()
+ {
+ return this.Copy() as SecureTransportContext;
+ }
+
+ #endregion
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs b/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
new file mode 100644
index 0000000..24b5bbf
--- /dev/null
+++ b/src/main/csharp/Transport/Secure/IProviderSecureTransportContext.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Net.Security;
+
+namespace Apache.NMS.AMQP.Transport.Secure
+{
+ internal interface IProviderSecureTransportContext : IProviderTransportContext
+ {
+
+ new IProviderSecureTransportContext Copy();
+
+ string ServerName { get; set; }
+
+ string ClientCertFileName { get; set; }
+
+ string ClientCertSubject { get; set; }
+
+ string ClientCertPassword { get; set; }
+
+ string KeyStoreName { get; set; }
+
+ string KeyStoreLocation { get; set; }
+
+ bool AcceptInvalidBrokerCert { get; set; }
+
+ string SSLProtocol { get; set; }
+
+ RemoteCertificateValidationCallback ServerCertificateValidateCallback { get; set; }
+
+ LocalCertificateSelectionCallback ClientCertificateSelectCallback { get; set; }
+
+ bool CheckCertificateRevocation { get; set; }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/AtomicSequence.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/AtomicSequence.cs b/src/main/csharp/Util/AtomicSequence.cs
new file mode 100644
index 0000000..fd59a94
--- /dev/null
+++ b/src/main/csharp/Util/AtomicSequence.cs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.AMQP.Util
+{
+ /// <summary>
+ /// Simple utility class used mainly for Id generation.
+ /// </summary>
+ class AtomicSequence : Atomic<ulong>
+ {
+ public AtomicSequence() : base()
+ {
+ }
+
+ public AtomicSequence(ulong defaultValue) : base(defaultValue)
+ {
+ }
+
+ public ulong getAndIncrement()
+ {
+ ulong val = 0;
+ lock (this)
+ {
+ val = atomicValue;
+ atomicValue++;
+ }
+ return val;
+ }
+
+ public override string ToString()
+ {
+ return Value.ToString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/DispatchExecutor.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/DispatchExecutor.cs b/src/main/csharp/Util/DispatchExecutor.cs
new file mode 100644
index 0000000..65fea61
--- /dev/null
+++ b/src/main/csharp/Util/DispatchExecutor.cs
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Apache.NMS.Util;
+using Apache.NMS;
+
+
+
+namespace Apache.NMS.AMQP.Util
+{
+
+ internal delegate void Executable();
+
+ internal interface IExecutable
+ {
+ Executable GetExecutable();
+ void OnFailure(Exception e);
+ }
+
+ internal interface IWaitable
+ {
+ bool IsComplete { get; }
+
+ bool IsCancelled { get; }
+
+ bool Wait();
+ bool Wait(TimeSpan timeout);
+ }
+
+ /// <summary>
+ /// General Dispatch Event to execute code from a DispatchExecutor.
+ /// </summary>
+ class DispatchEvent : IExecutable
+ {
+ private Executable exe;
+
+ public virtual Executable Callback
+ {
+ get { return exe; }
+ protected set { exe = value; }
+ }
+
+ internal DispatchEvent() : this(null) { }
+
+ internal DispatchEvent(Executable e) { exe = e; }
+
+ public virtual void OnFailure(Exception e)
+ {
+ Tracer.WarnFormat("Encountered Exception: {0} stack: {1}.", e.Message, e.StackTrace);
+ }
+
+ public Executable GetExecutable()
+ {
+ return exe;
+ }
+ }
+
+ #region Waitable Dispatcher Event Class
+ /// <summary>
+ /// A Dispatch event that is completion aware of its current state. This allows threads other then the Dispatch Executor thread to synchronize with the dispatch event.
+ /// </summary>
+ internal class WaitableDispatchEvent : DispatchEvent, IWaitable
+ {
+ private ManualResetEvent handle = new ManualResetEvent(false);
+
+ /* state:
+ 0 = Not Signaled,
+ 1 = Signaled,
+ 2 = Cancelled
+ */
+ #region EventState
+ protected class EventState
+ {
+ internal static EventState INITIAL = new EventState(0, "INITIAL");
+ internal static EventState SIGNALED = new EventState(1, "SIGNALED");
+ internal static EventState CANCELLED = new EventState(2, "CANCELLED");
+ internal static EventState UNKNOWN = new EventState(-1, "UNKNOWN");
+
+ private static Dictionary<int, EventState> States = new Dictionary<int, EventState>()
+ {
+ { INITIAL.value, INITIAL },
+ { SIGNALED.value, SIGNALED },
+ { CANCELLED.value, CANCELLED }
+ };
+
+
+
+ private readonly int value;
+ private readonly string name;
+ private EventState(int ordinal, string name = null)
+ {
+ value = ordinal;
+ this.name = name ?? ordinal.ToString();
+ }
+
+ public static implicit operator int(EventState es)
+ {
+ return es != null ? es.value : UNKNOWN.value;
+ }
+
+ public static implicit operator EventState(int value)
+ {
+ if(!States.TryGetValue(value, out EventState state))
+ {
+ state = UNKNOWN;
+ }
+
+ return state;
+ }
+
+ public override int GetHashCode()
+ {
+ return this.value;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if(obj != null && obj is EventState)
+ {
+ return this.value == (obj as EventState).value;
+ }
+ return false;
+ }
+
+ public override string ToString()
+ {
+ return this.name;
+ }
+ }
+ #endregion
+
+ private Atomic<int> state;
+
+ private Exception failureCause = null;
+
+ public override Executable Callback
+ {
+ get => base.Callback;
+ protected set
+ {
+ Executable cb;
+
+ if (value == null)
+ {
+ cb = () =>
+ {
+ this.Release();
+ };
+ }
+ else
+ {
+ cb = () =>
+ {
+ value.Invoke();
+ this.Release();
+ };
+ }
+
+ base.Callback = cb;
+ }
+ }
+
+ public bool IsComplete => EventState.SIGNALED.Equals(state.Value);
+
+ public bool IsCancelled => EventState.CANCELLED.Equals(state.Value);
+
+ internal WaitableDispatchEvent() : this(null)
+ {
+ }
+
+ internal WaitableDispatchEvent(Executable e)
+ {
+ handle = new ManualResetEvent(false);
+ state = new Atomic<int>(EventState.INITIAL);
+
+ this.Callback = e;
+ }
+
+ public void Reset()
+ {
+ state.GetAndSet(EventState.INITIAL);
+ if (!handle.Reset())
+ {
+ throw new NMSException("Failed to reset Waitable Event Signal.");
+ }
+ }
+
+ public void Cancel()
+ {
+ if (state.CompareAndSet(EventState.INITIAL, EventState.CANCELLED))
+ {
+ if (!handle.Set())
+ {
+ failureCause = new NMSException("Failed to cancel Waitable Event.");
+ }
+ }
+ }
+
+ private void Release()
+ {
+ if (state.CompareAndSet(EventState.INITIAL, EventState.SIGNALED))
+ {
+
+ if (!handle.Set())
+ {
+ state.GetAndSet(EventState.CANCELLED);
+ failureCause = new NMSException("Failed to release Waitable Event.");
+ }
+ }
+ }
+
+ public bool Wait()
+ {
+ return Wait(TimeSpan.Zero);
+ }
+
+ public bool Wait(TimeSpan timeout)
+ {
+ bool signaled = (timeout.Equals(TimeSpan.Zero)) ? handle.WaitOne() : handle.WaitOne(timeout);
+ if (state.Value == EventState.CANCELLED)
+ {
+ signaled = false;
+ if (failureCause != null)
+ {
+ throw failureCause;
+ }
+ }
+ return signaled;
+ }
+ }
+
+ #endregion
+
+ /// <summary>
+ /// Single Thread Executor for Dispatch Event. This Encapsulates Threading restrictions for Client code serialization.
+ /// </summary>
+ class DispatchExecutor : NMSResource, IDisposable
+ {
+ private static AtomicSequence ExecutorId = new AtomicSequence(1);
+ private const string ExecutorName = "DispatchExecutor";
+
+ private const int DEFAULT_SIZE = 100000;
+ private const int DEFAULT_DEQUEUE_TIMEOUT = 10000;
+ private Queue<IExecutable> queue;
+ private int maxSize;
+ private bool closed=false;
+ private Atomic<bool> closeQueued = new Atomic<bool>(false);
+ private bool executing=false;
+ private Semaphore suspendLock = new Semaphore(0, 10, "Suspend");
+ private Thread executingThread;
+ private readonly string name;
+ private readonly object objLock = new object();
+
+ #region Constructors
+
+ public DispatchExecutor() : this(DEFAULT_SIZE) { }
+
+ public DispatchExecutor(bool drain) : this(DEFAULT_SIZE, drain) { }
+
+ public DispatchExecutor(int size, bool drain = false)
+ {
+ this.maxSize = size;
+ this.ExecuteDrain = drain;
+ queue = new Queue<IExecutable>(maxSize);
+ executingThread = new Thread(new ThreadStart(this.Dispatch));
+ executingThread.IsBackground = true;
+ name = ExecutorName + ExecutorId.getAndIncrement() + ":" + executingThread.ManagedThreadId;
+ executingThread.Name = name;
+ }
+
+ ~DispatchExecutor()
+ {
+ try
+ {
+ Dispose(false);
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Caught exception in Finalizer for Dispatcher : {0}. Exception {1}", this.name, ex);
+ }
+ }
+
+ #endregion
+
+ #region Properties
+
+ protected object ThisLock { get { return objLock; } }
+
+ protected bool Closing { get { return closeQueued.Value; } }
+
+ public string Name { get { return name; } }
+
+ internal bool ExecuteDrain { get; private set; }
+
+ internal bool IsOnDispatchThread
+ {
+ get
+ {
+ string currentThreadName = Thread.CurrentThread.Name;
+ return currentThreadName != null && currentThreadName.Equals(name);
+ }
+ }
+
+ #endregion
+
+ #region Private Suspend Resume Methods
+
+#if TRACELOCKS
+ int scount = 0;
+#endif
+
+ protected void Suspend()
+ {
+ Exception e=null;
+ while(!AcquireSuspendLock(out e) && !closed)
+ {
+ if (e != null)
+ {
+ throw e;
+ }
+ }
+ }
+
+ protected bool AcquireSuspendLock()
+ {
+ Exception e;
+ return AcquireSuspendLock(out e);
+ }
+
+ protected bool AcquireSuspendLock(out Exception ex)
+ {
+ bool signaled = false;
+ ex = null;
+ try
+ {
+#if TRACELOCKS
+ Tracer.InfoFormat("Aquiring Suspend Lock Count {0}", scount);
+#endif
+ signaled = this.suspendLock.WaitOne();
+#if TRACELOCKS
+ scount = signaled ? scount - 1 : scount;
+#endif
+ }catch(Exception e)
+ {
+ ex = e;
+ }
+#if TRACELOCKS
+ finally
+ {
+ Tracer.InfoFormat("Suspend Lock Count after aquire {0} signaled {1}", scount, signaled);
+ }
+#endif
+ return signaled;
+ }
+
+ protected void Resume()
+ {
+ Exception ex;
+ int count = ReleaseSuspendLock(out ex);
+ if (ex != null)
+ {
+ throw ex;
+ }
+ }
+
+ protected int ReleaseSuspendLock()
+ {
+ Exception e;
+ return ReleaseSuspendLock(out e);
+ }
+
+ protected int ReleaseSuspendLock(out Exception ex)
+ {
+ ex = null;
+ int previous = -1;
+ try
+ {
+#if TRACELOCKS
+ Tracer.InfoFormat("Suspend Lock Count before release {0}", scount);
+#endif
+ previous = this.suspendLock.Release();
+#if TRACELOCKS
+ scount = previous != -1 ? scount + 1 : scount;
+ Tracer.InfoFormat("Suspend Lock Count after release {0} previous Value {1}", scount, previous);
+#endif
+ }
+ catch (SemaphoreFullException sfe)
+ {
+ // ignore multiple resume calls
+ // Log for debugging
+ Tracer.DebugFormat("Multiple Resume called on running Dispatcher. Cause: {0}", sfe.Message);
+
+ }
+ catch(System.IO.IOException ioe)
+ {
+ Tracer.ErrorFormat("Failed resuming or starting Dispatch thread. Cause: {0}", ioe.Message);
+ ex = ioe;
+ }
+ catch(UnauthorizedAccessException uae)
+ {
+ Tracer.Error(uae.StackTrace);
+ ex = uae;
+ }
+ if(ex!=null)
+ Console.WriteLine("Release Error {0}", ex);
+ return previous;
+ }
+
+ #endregion
+
+ #region Protected Dispatch Methods
+
+ protected void CloseOnQueue()
+ {
+ bool ifDrain = false;
+ bool executeDrain = false;
+ lock (queue)
+ {
+ if (!closed)
+ {
+ Stop();
+ closed = true;
+ executing = false;
+ ifDrain = true;
+ executeDrain = ExecuteDrain;
+ Monitor.PulseAll(queue);
+ Tracer.InfoFormat("DistpachExecutor: {0} Closed.", name);
+ }
+
+ }
+ if (ifDrain)
+ {
+ // Drain the rest of the queue before closing
+ Drain(executeDrain);
+ }
+ }
+
+ protected IExecutable[] DrainOffQueue()
+ {
+ lock (queue)
+ {
+ ArrayList list = new ArrayList(queue.Count);
+ while (queue.Count > 0)
+ {
+ list.Add(queue.Dequeue());
+ }
+ return (IExecutable[])list.ToArray(typeof(IExecutable));
+ }
+ }
+
+ protected void Drain(bool execute = false)
+ {
+ IExecutable[] exes = DrainOffQueue();
+ if (execute)
+ {
+ foreach (IExecutable exe in exes)
+ {
+ DispatchEvent(exe);
+ }
+ }
+ }
+
+ protected void DispatchEvent(IExecutable dispatchEvent)
+ {
+ Executable exe = dispatchEvent.GetExecutable();
+ if (exe != null)
+ {
+ try
+ {
+ exe.Invoke();
+ }
+ catch (Exception e)
+ {
+ // connect to exception listener here.
+ dispatchEvent.OnFailure(ExceptionSupport.Wrap(e, "Dispatch Executor Error ({0}):", this.name));
+
+ }
+ }
+ }
+
+ protected void Dispatch()
+ {
+ while (!closed)
+ {
+ bool locked = false;
+ while (!closed && !(locked = this.AcquireSuspendLock())) { }
+ if (locked)
+ {
+ int count = this.ReleaseSuspendLock();
+#if TraceLocks
+ Tracer.InfoFormat("Dispatch Suspend Lock Count {0}, Current Count {1}", count, count+1);
+#endif
+ }
+ if (closed)
+ {
+ break;
+ }
+
+ while (IsStarted)
+ {
+
+ IExecutable exe;
+ if (TryDequeue(out exe, DEFAULT_DEQUEUE_TIMEOUT))
+ {
+
+ DispatchEvent(exe);
+ }
+ else
+ {
+ // queue stopped or timed out
+ Tracer.DebugFormat("Queue {0} did not dispatch due to being Suspended or Closed.", name);
+ }
+ }
+
+ }
+
+ }
+
+#endregion
+
+ #region NMSResource Methods
+
+ protected override void StartResource()
+ {
+ if (!executing)
+ {
+ executing = true;
+ executingThread.Start();
+ Resume();
+ }
+ else
+ {
+ Resume();
+ }
+ }
+
+ protected override void StopResource()
+ {
+
+ lock (queue)
+ {
+ if (queue.Count == 0)
+ {
+ Monitor.PulseAll(queue);
+ }
+ }
+
+ Suspend();
+ }
+
+ protected override void ThrowIfClosed()
+ {
+ if (closed)
+ {
+ throw new Apache.NMS.IllegalStateException("Illegal Operation on closed " + this.GetType().Name + ".");
+ }
+ }
+
+ #endregion
+
+ #region Public Methods
+
+ /// <summary>
+ /// Closes the Dispatch Executor. See <see cref="DispatchExecutor.Shutdown(bool)"/>.
+ /// </summary>
+ public void Close()
+ {
+ this.Dispose(true);
+ //this.Shutdown(true);
+ }
+
+ public void Enqueue(IExecutable o)
+ {
+ if(o == null)
+ {
+ return;
+ }
+ lock (queue)
+ {
+ while (queue.Count >= maxSize)
+ {
+ if (closed)
+ {
+ return;
+ }
+ Monitor.Wait(queue);
+ }
+ queue.Enqueue(o);
+ if (queue.Count == 1)
+ {
+ Monitor.PulseAll(queue);
+ }
+
+ }
+ }
+
+ public bool TryDequeue(out IExecutable exe, int timeout = -1)
+ {
+ exe = null;
+ lock (queue)
+ {
+ bool signaled = true;
+ while (queue.Count == 0)
+ {
+ if (closed || mode.Value.Equals(Resource.Mode.Stopping))
+ {
+ return false;
+ }
+ signaled = (timeout > -1 ) ? Monitor.Wait(queue, timeout) : Monitor.Wait(queue);
+ }
+ if (!signaled)
+ {
+ return false;
+ }
+
+ exe = queue.Dequeue();
+ if (queue.Count == maxSize - 1)
+ {
+ Monitor.PulseAll(queue);
+ }
+
+ }
+ return true;
+ }
+
+ #endregion
+
+ #region IDispose Methods
+
+ /// <summary>
+ /// Shudowns down the dispatch Thread.
+ /// </summary>
+ /// <param name="join">
+ /// True, indicates whether the shutdown is orderly and therfore can block to join the thread.
+ /// False, indicates the shutdown can not block.
+ /// Default value is False.
+ /// </param>
+ internal void Shutdown(bool join = false)
+ {
+ if (IsOnDispatchThread)
+ {
+ // close is called in the Dispatcher Thread so we can just close
+ if (false == closeQueued.GetAndSet(true))
+ {
+ this.CloseOnQueue();
+ }
+ }
+ else if (closeQueued.CompareAndSet(false, true))
+ {
+ if (!IsStarted && executing)
+ {
+ // resume dispatching thread for Close Message Dispatch Event
+ Start();
+ }
+ // enqueue close
+ this.Enqueue(new DispatchEvent(this.CloseOnQueue));
+
+ if (join && executingThread != null)
+ {
+ // thread join must not happen under lock (queue) statement
+ if (!executingThread.ThreadState.Equals(ThreadState.Unstarted))
+ {
+ executingThread.Join();
+ }
+ executingThread = null;
+ }
+
+ }
+ }
+
+ protected virtual void Dispose(bool disposing)
+ {
+ if (closed) return;
+ lock (queue)
+ {
+ if (closed) return;
+ }
+ if (disposing)
+ {
+ // remove reference to dispatcher to be garbage collected
+ if (executingThread != null && executingThread.ThreadState.Equals(ThreadState.Unstarted))
+ {
+ executingThread = null;
+ }
+ this.Shutdown(true);
+ this.suspendLock.Dispose();
+ this.queue = null;
+ }
+ else
+ {
+ this.Shutdown();
+ this.suspendLock.Dispose();
+ this.queue = null;
+ }
+ }
+
+ public void Dispose()
+ {
+ try
+ {
+ this.Close();
+ }
+ catch (Exception ex)
+ {
+ Tracer.DebugFormat("Caught Exception during Dispose for Dispatcher {0}. Exception {1}", this.name, ex);
+ }
+ }
+
+ #endregion
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-nms-amqp/blob/432c9613/src/main/csharp/Util/ExceptionSupport.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Util/ExceptionSupport.cs b/src/main/csharp/Util/ExceptionSupport.cs
new file mode 100644
index 0000000..8d93564
--- /dev/null
+++ b/src/main/csharp/Util/ExceptionSupport.cs
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using System.Collections;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Reflection;
+using Apache.NMS;
+using Amqp.Framing;
+using Amqp;
+
+namespace Apache.NMS.AMQP.Util
+{
+ class ExceptionSupport
+ {
+ private static readonly Dictionary<string, Type> errTypeMap;
+
+ static ExceptionSupport()
+ {
+ // mapping of amqp .Net Lite error code to NMS exception type
+
+ errTypeMap = new Dictionary<string, Type>();
+ errTypeMap.Add(NMSErrorCode.CONNECTION_TIME_OUT, typeof(NMSConnectionException));
+ errTypeMap.Add(ErrorCode.ConnectionRedirect, typeof(NMSConnectionException));
+ errTypeMap.Add(ErrorCode.ConnectionForced, typeof(NMSConnectionException));
+ errTypeMap.Add(ErrorCode.IllegalState, typeof(IllegalStateException));
+
+ errTypeMap.Add(NMSErrorCode.INTERNAL_ERROR, typeof(NMSException));
+ errTypeMap.Add(NMSErrorCode.UNKNOWN_ERROR, typeof(NMSException));
+ errTypeMap.Add(NMSErrorCode.SESSION_TIME_OUT, typeof(NMSException));
+ errTypeMap.Add(NMSErrorCode.LINK_TIME_OUT, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.DecodeError, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.DetachForced, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.ErrantLink, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.FrameSizeTooSmall, typeof(NMSConnectionException));
+ errTypeMap.Add(ErrorCode.FramingError, typeof(NMSConnectionException));
+ errTypeMap.Add(ErrorCode.HandleInUse, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.InternalError, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.InvalidField, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.LinkRedirect, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.MessageReleased, typeof(IllegalStateException));
+ errTypeMap.Add(ErrorCode.MessageSizeExceeded, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.NotAllowed, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.NotFound, typeof(InvalidDestinationException));
+ errTypeMap.Add(ErrorCode.NotImplemented, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.PreconditionFailed, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.ResourceDeleted, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.ResourceLimitExceeded, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.ResourceLocked, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.Stolen, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.TransactionRollback, typeof(TransactionRolledBackException));
+ errTypeMap.Add(ErrorCode.TransactionTimeout, typeof(TransactionInProgressException));
+ errTypeMap.Add(ErrorCode.TransactionUnknownId, typeof(TransactionRolledBackException));
+ errTypeMap.Add(ErrorCode.TransferLimitExceeded, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.UnattachedHandle, typeof(NMSException));
+ errTypeMap.Add(ErrorCode.UnauthorizedAccess, typeof(NMSSecurityException));
+ errTypeMap.Add(ErrorCode.WindowViolation, typeof(NMSException));
+
+ }
+
+ private static FieldInfo[] GetConstants(Type type)
+ {
+ ArrayList list = new ArrayList();
+ FieldInfo[] fields = type.GetFields(
+ BindingFlags.Static |
+ BindingFlags.Public |
+ BindingFlags.FlattenHierarchy
+ );
+ foreach (FieldInfo field in fields)
+ {
+ if (field.IsLiteral && !field.IsInitOnly)
+ list.Add(field);
+ }
+ return (FieldInfo[])list.ToArray(typeof(FieldInfo));
+ }
+
+ private static string[] GetStringConstants(Type type)
+ {
+ FieldInfo[] fields = GetConstants(type);
+ ArrayList list = new ArrayList(fields.Length);
+ foreach(FieldInfo fi in fields)
+ {
+ if (fi.FieldType.Equals(typeof(string)))
+ {
+ list.Add(fi.GetValue(null));
+ }
+ }
+ return (string[])list.ToArray(typeof(string));
+ }
+
+ public static NMSException GetTimeoutException(IAmqpObject obj, string format, params object[] args)
+ {
+ return GetTimeoutException(obj, string.Format(format, args));
+ }
+
+ public static NMSException GetTimeoutException(IAmqpObject obj, string message)
+ {
+ Error e = null;
+ if (obj is Amqp.Connection)
+ {
+ e = NMSError.CONNECTION_TIMEOUT;
+ }
+ else if (obj is Amqp.Session)
+ {
+ e = NMSError.SESSION_TIMEOUT;
+ }
+ else if (obj is Amqp.Link)
+ {
+ e = NMSError.LINK_TIMEOUT;
+ }
+
+ return GetException(e, message);
+
+ }
+
+ public static NMSException GetException(IAmqpObject obj, string format, params object[] args)
+ {
+ return GetException(obj, string.Format(format, args));
+ }
+
+ public static NMSException GetException(Error amqpErr, string format, params object[] args)
+ {
+ return GetException(amqpErr, string.Format(format, args));
+ }
+
+ public static NMSException GetException(IAmqpObject obj, string message="")
+ {
+ return GetException(obj.Error, message);
+ }
+
+ public static NMSException GetException(Error amqpErr, string message = "", Exception e = null)
+ {
+ string errCode = null;
+ string errMessage = null;
+ string additionalErrInfo = null;
+ if (amqpErr == null)
+ {
+ amqpErr = NMSError.INTERNAL;
+ }
+
+ errCode = amqpErr.Condition.ToString();
+ errMessage = amqpErr.Description;
+
+ errMessage = errMessage != null ? ", Description = " + errMessage : "";
+
+ if (amqpErr.Info != null && amqpErr.Info.Count > 0)
+ {
+ additionalErrInfo = ", ErrorInfo = " + Types.ConversionSupport.ToString(amqpErr.Info);
+ }
+ if (null == e)
+ {
+ // no exception given, create a NMSunthrownException to hold the
+ // stack and use it as the innerException in the constructors for
+ // the NMSexception we create., the custom StackTrace() will allow exception listeners to
+ // see the stack to here
+ e = new NMSProviderError(errCode, errMessage);
+ }
+ NMSException ex = null;
+ Type exType = null;
+ if(errTypeMap.TryGetValue(errCode, out exType))
+ {
+ ConstructorInfo ci = exType.GetConstructor(new[] { typeof(string), typeof(string), typeof(Exception) });
+ object inst = ci.Invoke(new object[] { message + errMessage + additionalErrInfo , errCode, e });
+ ex = inst as NMSException;
+ }
+ else
+ {
+ ex = new NMSException(message + errMessage + additionalErrInfo, errCode, e);
+ }
+ return ex;
+
+ }
+
+ public static NMSException Wrap(Exception e, string format, params object[] args)
+ {
+ return Wrap(e, string.Format(format, args));
+ }
+
+ public static NMSException Wrap(Exception e, string message = "")
+ {
+ if(e == null)
+ {
+ return null;
+ }
+ NMSException nmsEx = null;
+ string exMessage = message;
+ if (exMessage == null || exMessage.Length == 0)
+ {
+ exMessage = e.Message;
+ }
+ if (e is NMSException)
+ {
+ return e as NMSException;
+ }
+ else if (e is AmqpException)
+ {
+ Error err = (e as AmqpException).Error;
+ nmsEx = GetException(err, message, e);
+ Tracer.DebugFormat("Encoutered AmqpException {0} and created NMS Exception {1}.", e, nmsEx);
+ }
+ else
+ {
+ nmsEx = new NMSException(exMessage, NMSErrorCode.INTERNAL_ERROR, e);
+ }
+
+ return nmsEx;
+ }
+
+ }
+
+ #region Exceptions
+
+
+ public class InvalidPropertyException : NMSException
+ {
+ protected static string ExFormat = "Invalid Property {0}. Cause: {1}";
+
+ public InvalidPropertyException(string property, string message) : base(string.Format(ExFormat, property, message))
+ {
+ exceptionErrorCode = NMSErrorCode.PROPERTY_ERROR;
+ }
+ }
+ // The API converts AMPQ Errors to NMSProviderError. This is typically added to
+ // the Exception queue and passed to the ExceptionListener. As the Exception is
+ // instantiated but never thrown, it does not have an exception-stack trace. We add
+ // one to the private member InstanceTrace and override StackTrace so useful
+ // information can be displayed.
+
+ internal class NMSProviderError : NMSException
+ {
+ private string InstanceTrace;
+
+ public NMSProviderError() : base()
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ }
+
+ public NMSProviderError(string message) : base(message)
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ }
+
+ public NMSProviderError(string message, string errorCode) : base(message, errorCode)
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ }
+
+ public NMSProviderError(string message, NMSException innerException) : base(message, innerException)
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ exceptionErrorCode = innerException.ErrorCode ?? NMSErrorCode.INTERNAL_ERROR;
+ }
+
+ public NMSProviderError(string message, Exception innerException) : base(message, innerException)
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ }
+
+ public NMSProviderError(string message, string errorCode, Exception innerException) : base(message, errorCode, innerException)
+ {
+ System.Diagnostics.StackTrace trace = new System.Diagnostics.StackTrace(1, true);
+ InstanceTrace = trace.ToString();
+ }
+
+ public override string StackTrace
+ {
+ get
+ {
+ string stack = base.StackTrace;
+ if (stack == null || stack.Length == 0)
+ {
+ stack = InstanceTrace;
+ }
+ if (InnerException != null && (InnerException.StackTrace != null && InnerException.StackTrace.Length > 0))
+ {
+ stack += "\nCause " + InnerException.Message + " : \n" +
+ InnerException.StackTrace;
+
+ }
+ return stack;
+ }
+ }
+ }
+
+ #endregion
+
+ #region Error Codes
+
+ internal static class NMSError
+ {
+ public static Error SESSION_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) { Description = "Session Begin Request has timed out." };
+ public static Error CONNECTION_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) { Description = "Connection Open Request has timed out." };
+ public static Error LINK_TIMEOUT = new Error(NMSErrorCode.SESSION_TIME_OUT) { Description = "Link Attach Request has timed out." };
+ public static Error PROPERTY = new Error(NMSErrorCode.PROPERTY_ERROR) { Description = "Property Error." };
+ public static Error UNKNOWN = new Error(NMSErrorCode.UNKNOWN_ERROR) { Description = "Unknown Error." };
+ public static Error INTERNAL = new Error(NMSErrorCode.INTERNAL_ERROR) { Description = "Internal Error." };
+
+ }
+ internal static class NMSErrorCode
+ {
+ public static string CONNECTION_TIME_OUT = "nms:connection:timout";
+ public static string SESSION_TIME_OUT = "nms:session:timout";
+ public static string LINK_TIME_OUT = "nms:link:timeout";
+ public static string PROPERTY_ERROR = "nms:property:error";
+ public static string UNKNOWN_ERROR = "nms:unknown";
+ public static string INTERNAL_ERROR = "nms:internal";
+ }
+
+ #endregion
+}